1use scirs2_core::ndarray::{s, Array1, Array2, Axis};
2use scirs2_core::random::rngs::StdRng;
3use scirs2_core::random::{thread_rng, Rng, SeedableRng};
4use scirs2_core::StandardNormal;
5use sklears_core::error::{Result, SklearsError};
6use std::collections::VecDeque;
7
8#[derive(Debug, Clone)]
16pub enum BufferStrategy {
17 FixedSize(usize),
19 SlidingWindow { size: usize, time_window: f64 },
21 ReservoirSampling(usize),
23 ExponentialDecay { alpha: f64, min_weight: f64 },
25 ImportanceWeighted { capacity: usize, threshold: f64 },
27}
28
29#[derive(Debug, Clone)]
31pub enum UpdateFrequency {
32 PerSample,
34 BatchSize(usize),
36 TimeInterval(f64),
38 ErrorThreshold(f64),
40 Adaptive {
42 initial: usize,
43 max: usize,
44 min: usize,
45 },
46}
47
48#[derive(Debug, Clone)]
50pub enum ForgettingMechanism {
51 None,
53 LinearDecay(f64),
55 ExponentialDecay(f64),
57 AbruptForgetting(f64),
59 SigmoidDecay { steepness: f64, midpoint: f64 },
61}
62
63#[derive(Debug, Clone)]
65pub struct StreamingConfig {
66 pub buffer_strategy: BufferStrategy,
68 pub update_frequency: UpdateFrequency,
70 pub forgetting_mechanism: ForgettingMechanism,
72 pub max_memory_mb: Option<usize>,
74 pub adaptive_components: bool,
76 pub quality_monitoring: bool,
78 pub drift_detection: bool,
80 pub concept_drift_threshold: f64,
82}
83
84impl Default for StreamingConfig {
85 fn default() -> Self {
86 Self {
87 buffer_strategy: BufferStrategy::FixedSize(1000),
88 update_frequency: UpdateFrequency::BatchSize(100),
89 forgetting_mechanism: ForgettingMechanism::ExponentialDecay(0.99),
90 max_memory_mb: Some(100),
91 adaptive_components: true,
92 quality_monitoring: true,
93 drift_detection: false,
94 concept_drift_threshold: 0.1,
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct StreamingSample {
102 pub data: Array1<f64>,
104 pub timestamp: f64,
106 pub weight: f64,
108 pub importance: f64,
110 pub label: Option<f64>,
112}
113
114impl StreamingSample {
115 pub fn new(data: Array1<f64>, timestamp: f64) -> Self {
116 Self {
117 data,
118 timestamp,
119 weight: 1.0,
120 importance: 1.0,
121 label: None,
122 }
123 }
124
125 pub fn with_weight(mut self, weight: f64) -> Self {
126 self.weight = weight;
127 self
128 }
129
130 pub fn with_importance(mut self, importance: f64) -> Self {
131 self.importance = importance;
132 self
133 }
134
135 pub fn with_label(mut self, label: f64) -> Self {
136 self.label = Some(label);
137 self
138 }
139}
140
141pub struct StreamingRBFSampler {
146 n_components: usize,
147 gamma: f64,
148 config: StreamingConfig,
149 weights: Option<Array2<f64>>,
150 bias: Option<Array1<f64>>,
151 buffer: VecDeque<StreamingSample>,
152 sample_count: usize,
153 last_update: usize,
154 feature_statistics: FeatureStatistics,
155 random_state: Option<u64>,
156 rng: StdRng,
157}
158
159#[derive(Debug, Clone)]
161pub struct FeatureStatistics {
162 pub mean: Array1<f64>,
164 pub variance: Array1<f64>,
166 pub min: Array1<f64>,
168 pub max: Array1<f64>,
170 pub update_count: usize,
172 pub approximation_error: f64,
174 pub drift_score: f64,
176}
177
178impl FeatureStatistics {
179 pub fn new(n_components: usize) -> Self {
180 Self {
181 mean: Array1::zeros(n_components),
182 variance: Array1::zeros(n_components),
183 min: Array1::from_elem(n_components, f64::INFINITY),
184 max: Array1::from_elem(n_components, f64::NEG_INFINITY),
185 update_count: 0,
186 approximation_error: 0.0,
187 drift_score: 0.0,
188 }
189 }
190
191 pub fn update(&mut self, features: &Array2<f64>) {
192 let n_samples = features.nrows();
193
194 for i in 0..features.ncols() {
195 let col = features.column(i);
196 let new_mean = col.mean().unwrap_or(0.0);
197 let new_var = col.mapv(|x| (x - new_mean).powi(2)).mean().unwrap_or(0.0);
198 let new_min = col.iter().fold(f64::INFINITY, |a, &b| a.min(b));
199 let new_max = col.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
200
201 let old_count = self.update_count;
203 let new_count = old_count + n_samples;
204
205 if old_count == 0 {
206 self.mean[i] = new_mean;
207 self.variance[i] = new_var;
208 } else {
209 let alpha = n_samples as f64 / new_count as f64;
210 self.mean[i] = (1.0 - alpha) * self.mean[i] + alpha * new_mean;
211 self.variance[i] = (1.0 - alpha) * self.variance[i] + alpha * new_var;
212 }
213
214 self.min[i] = self.min[i].min(new_min);
215 self.max[i] = self.max[i].max(new_max);
216 }
217
218 self.update_count += n_samples;
219 }
220
221 pub fn detect_drift(&mut self, new_features: &Array2<f64>) -> bool {
222 let old_mean = self.mean.clone();
223 self.update(new_features);
224
225 let mean_shift = (&self.mean - &old_mean).mapv(f64::abs).sum();
227 self.drift_score = mean_shift / self.mean.len() as f64;
228
229 self.drift_score > 0.1 }
231}
232
233impl StreamingRBFSampler {
234 pub fn new(n_components: usize, gamma: f64) -> Self {
236 let rng = StdRng::from_seed(thread_rng().gen());
237 Self {
238 n_components,
239 gamma,
240 config: StreamingConfig::default(),
241 weights: None,
242 bias: None,
243 buffer: VecDeque::new(),
244 sample_count: 0,
245 last_update: 0,
246 feature_statistics: FeatureStatistics::new(n_components),
247 random_state: None,
248 rng,
249 }
250 }
251
252 pub fn with_config(mut self, config: StreamingConfig) -> Self {
254 self.config = config;
255 self
256 }
257
258 pub fn with_random_state(mut self, random_state: u64) -> Self {
260 self.random_state = Some(random_state);
261 self.rng = StdRng::seed_from_u64(random_state);
262 self
263 }
264
265 pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
267 let (_, n_features) = x.dim();
268
269 self.weights = Some(self.generate_weights(n_features)?);
271 self.bias = Some(self.generate_bias()?);
272
273 for (i, row) in x.rows().into_iter().enumerate() {
275 let sample = StreamingSample::new(row.to_owned(), i as f64);
276 self.add_sample(sample)?;
277 }
278
279 Ok(())
280 }
281
282 pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
284 if self.weights.is_none() {
286 let n_features = sample.data.len();
287 self.weights = Some(self.generate_weights(n_features)?);
288 self.bias = Some(self.generate_bias()?);
289 }
290
291 self.manage_buffer(sample)?;
293
294 self.sample_count += 1;
295
296 if self.should_update()? {
298 self.update_model()?;
299 self.last_update = self.sample_count;
300 }
301
302 Ok(())
303 }
304
305 pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
307 let weights = self
308 .weights
309 .as_ref()
310 .ok_or_else(|| SklearsError::NotFitted {
311 operation: "transform".to_string(),
312 })?;
313 let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
314 operation: "transform".to_string(),
315 })?;
316
317 self.compute_features(x, weights, bias)
318 }
319
320 pub fn transform_sample(&self, sample: &Array1<f64>) -> Result<Array1<f64>> {
322 let weights = self
323 .weights
324 .as_ref()
325 .ok_or_else(|| SklearsError::NotFitted {
326 operation: "transform_sample".to_string(),
327 })?;
328 let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
329 operation: "transform_sample".to_string(),
330 })?;
331
332 let projection = sample.dot(&weights.t()) + bias;
334 let norm_factor = (2.0 / self.n_components as f64).sqrt();
335
336 Ok(projection.mapv(|x| norm_factor * x.cos()))
337 }
338
339 pub fn buffer_stats(&self) -> (usize, f64, f64) {
341 let size = self.buffer.len();
342 let avg_weight = if size > 0 {
343 self.buffer.iter().map(|s| s.weight).sum::<f64>() / size as f64
344 } else {
345 0.0
346 };
347 let avg_importance = if size > 0 {
348 self.buffer.iter().map(|s| s.importance).sum::<f64>() / size as f64
349 } else {
350 0.0
351 };
352
353 (size, avg_weight, avg_importance)
354 }
355
356 pub fn feature_stats(&self) -> &FeatureStatistics {
358 &self.feature_statistics
359 }
360
361 pub fn detect_drift(&mut self, x: &Array2<f64>) -> Result<bool> {
363 if !self.config.drift_detection {
364 return Ok(false);
365 }
366
367 let features = self.transform(x)?;
368 Ok(self.feature_statistics.detect_drift(&features))
369 }
370
371 fn manage_buffer(&mut self, sample: StreamingSample) -> Result<()> {
373 match &self.config.buffer_strategy {
374 BufferStrategy::FixedSize(max_size) => {
375 if self.buffer.len() >= *max_size {
376 self.buffer.pop_front();
377 }
378 self.buffer.push_back(sample);
379 }
380 BufferStrategy::SlidingWindow { size, time_window } => {
381 let current_time = sample.timestamp;
383 while let Some(front) = self.buffer.front() {
384 if current_time - front.timestamp > *time_window {
385 self.buffer.pop_front();
386 } else {
387 break;
388 }
389 }
390
391 if self.buffer.len() >= *size {
393 self.buffer.pop_front();
394 }
395 self.buffer.push_back(sample);
396 }
397 BufferStrategy::ReservoirSampling(capacity) => {
398 if self.buffer.len() < *capacity {
399 self.buffer.push_back(sample);
400 } else {
401 let replace_idx = self.rng.gen_range(0..self.sample_count + 1);
402 if replace_idx < *capacity {
403 self.buffer[replace_idx] = sample;
404 }
405 }
406 }
407 BufferStrategy::ExponentialDecay { alpha, min_weight } => {
408 for existing_sample in &mut self.buffer {
410 existing_sample.weight *= alpha;
411 }
412
413 self.buffer.retain(|s| s.weight >= *min_weight);
415
416 self.buffer.push_back(sample);
417 }
418 BufferStrategy::ImportanceWeighted {
419 capacity,
420 threshold,
421 } => {
422 if self.buffer.len() < *capacity {
423 self.buffer.push_back(sample);
424 } else {
425 if let Some((min_idx, _)) =
427 self.buffer.iter().enumerate().min_by(|(_, a), (_, b)| {
428 a.importance.partial_cmp(&b.importance).unwrap()
429 })
430 {
431 if sample.importance > self.buffer[min_idx].importance + threshold {
432 self.buffer[min_idx] = sample;
433 }
434 }
435 }
436 }
437 }
438
439 Ok(())
440 }
441
442 fn should_update(&self) -> Result<bool> {
444 match &self.config.update_frequency {
445 UpdateFrequency::PerSample => Ok(true),
446 UpdateFrequency::BatchSize(batch_size) => {
447 Ok(self.sample_count - self.last_update >= *batch_size)
448 }
449 UpdateFrequency::TimeInterval(_time_interval) => {
450 Ok(self.sample_count - self.last_update >= 100)
452 }
453 UpdateFrequency::ErrorThreshold(_threshold) => {
454 Ok(self.sample_count - self.last_update >= 50)
456 }
457 UpdateFrequency::Adaptive {
458 initial,
459 max: _,
460 min: _,
461 } => Ok(self.sample_count - self.last_update >= *initial),
462 }
463 }
464
465 fn update_model(&mut self) -> Result<()> {
467 if self.buffer.is_empty() {
468 return Ok(());
469 }
470
471 let mut data_matrix = Array2::zeros((self.buffer.len(), self.buffer[0].data.len()));
473 for (i, sample) in self.buffer.iter().enumerate() {
474 data_matrix.row_mut(i).assign(&sample.data);
475 }
476
477 let weights = self.weights.as_ref().unwrap();
479 let bias = self.bias.as_ref().unwrap();
480 let features = self.compute_features(&data_matrix, weights, bias)?;
481
482 self.feature_statistics.update(&features);
483
484 self.apply_forgetting()?;
486
487 Ok(())
488 }
489
490 fn apply_forgetting(&mut self) -> Result<()> {
492 match &self.config.forgetting_mechanism {
493 ForgettingMechanism::None => {
494 }
496 ForgettingMechanism::LinearDecay(decay_rate) => {
497 for sample in &mut self.buffer {
498 sample.weight *= 1.0 - decay_rate;
499 }
500 }
501 ForgettingMechanism::ExponentialDecay(decay_rate) => {
502 for sample in &mut self.buffer {
503 sample.weight *= decay_rate;
504 }
505 }
506 ForgettingMechanism::AbruptForgetting(time_threshold) => {
507 if let Some(newest) = self.buffer.back() {
508 let cutoff_time = newest.timestamp - time_threshold;
509 self.buffer.retain(|s| s.timestamp >= cutoff_time);
510 }
511 }
512 ForgettingMechanism::SigmoidDecay {
513 steepness,
514 midpoint,
515 } => {
516 if let Some(newest_timestamp) = self.buffer.back().map(|s| s.timestamp) {
517 for sample in &mut self.buffer {
518 let age = newest_timestamp - sample.timestamp;
519 let sigmoid_weight = 1.0 / (1.0 + (steepness * (age - midpoint)).exp());
520 sample.weight *= sigmoid_weight;
521 }
522 }
523 }
524 }
525
526 Ok(())
527 }
528
529 fn generate_weights(&mut self, n_features: usize) -> Result<Array2<f64>> {
531 let mut weights = Array2::zeros((self.n_components, n_features));
532
533 for i in 0..self.n_components {
534 for j in 0..n_features {
535 weights[[i, j]] =
536 self.rng.sample::<f64, _>(StandardNormal) * (2.0 * self.gamma).sqrt();
537 }
538 }
539
540 Ok(weights)
541 }
542
543 fn generate_bias(&mut self) -> Result<Array1<f64>> {
545 let mut bias = Array1::zeros(self.n_components);
546
547 for i in 0..self.n_components {
548 bias[i] = self.rng.gen_range(0.0..2.0 * std::f64::consts::PI);
549 }
550
551 Ok(bias)
552 }
553
554 fn compute_features(
556 &self,
557 x: &Array2<f64>,
558 weights: &Array2<f64>,
559 bias: &Array1<f64>,
560 ) -> Result<Array2<f64>> {
561 let (n_samples, _) = x.dim();
562 let n_components = weights.nrows();
563
564 let projection = x.dot(&weights.t()) + bias;
566
567 let mut features = Array2::zeros((n_samples, n_components));
569 let norm_factor = (2.0 / n_components as f64).sqrt();
570
571 for i in 0..n_samples {
572 for j in 0..n_components {
573 features[[i, j]] = norm_factor * projection[[i, j]].cos();
574 }
575 }
576
577 Ok(features)
578 }
579}
580
581pub struct StreamingNystroem {
586 n_components: usize,
587 gamma: f64,
588 config: StreamingConfig,
589 inducing_points: Option<Array2<f64>>,
590 eigenvalues: Option<Array1<f64>>,
591 eigenvectors: Option<Array2<f64>>,
592 buffer: VecDeque<StreamingSample>,
593 sample_count: usize,
594 last_update: usize,
595 random_state: Option<u64>,
596 rng: StdRng,
597}
598
599impl StreamingNystroem {
600 pub fn new(n_components: usize, gamma: f64) -> Self {
602 let rng = StdRng::from_seed(thread_rng().gen());
603 Self {
604 n_components,
605 gamma,
606 config: StreamingConfig::default(),
607 inducing_points: None,
608 eigenvalues: None,
609 eigenvectors: None,
610 buffer: VecDeque::new(),
611 sample_count: 0,
612 last_update: 0,
613 random_state: None,
614 rng,
615 }
616 }
617
618 pub fn with_config(mut self, config: StreamingConfig) -> Self {
620 self.config = config;
621 self
622 }
623
624 pub fn with_random_state(mut self, random_state: u64) -> Self {
626 self.random_state = Some(random_state);
627 self.rng = StdRng::seed_from_u64(random_state);
628 self
629 }
630
631 pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
633 let inducing_indices = self.select_inducing_points(x)?;
635 let inducing_points = x.select(Axis(0), &inducing_indices);
636
637 let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
639 let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
640
641 self.inducing_points = Some(inducing_points);
642 self.eigenvalues = Some(eigenvalues);
643 self.eigenvectors = Some(eigenvectors);
644
645 for (i, row) in x.rows().into_iter().enumerate() {
647 let sample = StreamingSample::new(row.to_owned(), i as f64);
648 self.buffer.push_back(sample);
649 }
650
651 self.sample_count = x.nrows();
652
653 Ok(())
654 }
655
656 pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
658 self.buffer.push_back(sample);
659 self.sample_count += 1;
660
661 match &self.config.buffer_strategy {
663 BufferStrategy::FixedSize(max_size) => {
664 if self.buffer.len() > *max_size {
665 self.buffer.pop_front();
666 }
667 }
668 _ => {
669 }
671 }
672
673 if self.should_update()? {
675 self.update_model()?;
676 self.last_update = self.sample_count;
677 }
678
679 Ok(())
680 }
681
682 pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
684 let inducing_points =
685 self.inducing_points
686 .as_ref()
687 .ok_or_else(|| SklearsError::NotFitted {
688 operation: "transform".to_string(),
689 })?;
690 let eigenvalues = self
691 .eigenvalues
692 .as_ref()
693 .ok_or_else(|| SklearsError::NotFitted {
694 operation: "transform".to_string(),
695 })?;
696 let eigenvectors = self
697 .eigenvectors
698 .as_ref()
699 .ok_or_else(|| SklearsError::NotFitted {
700 operation: "transform".to_string(),
701 })?;
702
703 let kernel_x_inducing = self.compute_kernel(x, inducing_points)?;
705
706 let mut features = kernel_x_inducing.dot(eigenvectors);
708
709 for i in 0..eigenvalues.len() {
711 if eigenvalues[i] > 1e-12 {
712 let scale = 1.0 / eigenvalues[i].sqrt();
713 for j in 0..features.nrows() {
714 features[[j, i]] *= scale;
715 }
716 }
717 }
718
719 Ok(features)
720 }
721
722 fn should_update(&self) -> Result<bool> {
724 Ok(self.sample_count - self.last_update >= 100)
726 }
727
728 fn update_model(&mut self) -> Result<()> {
730 if self.buffer.is_empty() {
731 return Ok(());
732 }
733
734 let n_samples = self.buffer.len();
736 let n_features = self.buffer[0].data.len();
737 let mut data_matrix = Array2::zeros((n_samples, n_features));
738
739 for (i, sample) in self.buffer.iter().enumerate() {
740 data_matrix.row_mut(i).assign(&sample.data);
741 }
742
743 let inducing_indices = self.select_inducing_points(&data_matrix)?;
745 let inducing_points = data_matrix.select(Axis(0), &inducing_indices);
746
747 let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
749 let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
750
751 self.inducing_points = Some(inducing_points);
752 self.eigenvalues = Some(eigenvalues);
753 self.eigenvectors = Some(eigenvectors);
754
755 Ok(())
756 }
757
758 fn select_inducing_points(&mut self, x: &Array2<f64>) -> Result<Vec<usize>> {
760 let n_samples = x.nrows();
761 let n_inducing = self.n_components.min(n_samples);
762
763 let mut indices = Vec::new();
764 for _ in 0..n_inducing {
765 indices.push(self.rng.gen_range(0..n_samples));
766 }
767
768 Ok(indices)
769 }
770
771 fn compute_kernel_matrix(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
773 let n_samples = x.nrows();
774 let mut kernel_matrix = Array2::zeros((n_samples, n_samples));
775
776 for i in 0..n_samples {
777 for j in i..n_samples {
778 let diff = &x.row(i) - &x.row(j);
779 let squared_dist = diff.mapv(|x| x * x).sum();
780 let kernel_val = (-self.gamma * squared_dist).exp();
781 kernel_matrix[[i, j]] = kernel_val;
782 kernel_matrix[[j, i]] = kernel_val;
783 }
784 }
785
786 Ok(kernel_matrix)
787 }
788
789 fn compute_kernel(&self, x: &Array2<f64>, y: &Array2<f64>) -> Result<Array2<f64>> {
791 let (n_samples_x, _) = x.dim();
792 let (n_samples_y, _) = y.dim();
793 let mut kernel_matrix = Array2::zeros((n_samples_x, n_samples_y));
794
795 for i in 0..n_samples_x {
796 for j in 0..n_samples_y {
797 let diff = &x.row(i) - &y.row(j);
798 let squared_dist = diff.mapv(|x| x * x).sum();
799 let kernel_val = (-self.gamma * squared_dist).exp();
800 kernel_matrix[[i, j]] = kernel_val;
801 }
802 }
803
804 Ok(kernel_matrix)
805 }
806
807 fn eigendecomposition(&self, matrix: &Array2<f64>) -> Result<(Array1<f64>, Array2<f64>)> {
809 let n = matrix.nrows();
810 let eigenvalues = Array1::ones(self.n_components.min(n));
811 let eigenvectors = Array2::eye(n)
812 .slice(s![.., ..self.n_components.min(n)])
813 .to_owned();
814
815 Ok((eigenvalues, eigenvectors))
816 }
817}
818
819#[allow(non_snake_case)]
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use scirs2_core::ndarray::array;
824
825 #[test]
826 fn test_streaming_rbf_sampler_basic() {
827 let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
828
829 let mut sampler = StreamingRBFSampler::new(50, 0.1).with_random_state(42);
830
831 sampler.fit(&x).unwrap();
832 let features = sampler.transform(&x).unwrap();
833
834 assert_eq!(features.nrows(), 4);
835 assert_eq!(features.ncols(), 50);
836 }
837
838 #[test]
839 fn test_streaming_sample() {
840 let data = array![1.0, 2.0, 3.0];
841 let sample = StreamingSample::new(data.clone(), 1.0)
842 .with_weight(0.8)
843 .with_importance(0.9)
844 .with_label(1.0);
845
846 assert_eq!(sample.data, data);
847 assert_eq!(sample.timestamp, 1.0);
848 assert_eq!(sample.weight, 0.8);
849 assert_eq!(sample.importance, 0.9);
850 assert_eq!(sample.label, Some(1.0));
851 }
852
853 #[test]
854 fn test_buffer_strategies() {
855 let mut sampler = StreamingRBFSampler::new(10, 0.1).with_config(StreamingConfig {
856 buffer_strategy: BufferStrategy::FixedSize(3),
857 ..Default::default()
858 });
859
860 for i in 0..5 {
862 let data = array![i as f64, (i + 1) as f64];
863 let sample = StreamingSample::new(data, i as f64);
864 sampler.add_sample(sample).unwrap();
865 }
866
867 let (size, _, _) = sampler.buffer_stats();
868 assert_eq!(size, 3); }
870
871 #[test]
872 fn test_streaming_nystroem_basic() {
873 let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
874
875 let mut nystroem = StreamingNystroem::new(3, 0.1).with_random_state(42);
876
877 nystroem.fit(&x).unwrap();
878 let features = nystroem.transform(&x).unwrap();
879
880 assert_eq!(features.nrows(), 4);
881 assert_eq!(features.ncols(), 3);
882 }
883
884 #[test]
885 fn test_feature_statistics() {
886 let mut stats = FeatureStatistics::new(3);
887 let features = array![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]];
888
889 stats.update(&features);
890
891 assert_eq!(stats.update_count, 2);
892 assert!((stats.mean[0] - 2.5).abs() < 1e-10);
893 assert!((stats.mean[1] - 3.5).abs() < 1e-10);
894 assert!((stats.mean[2] - 4.5).abs() < 1e-10);
895 }
896
897 #[test]
898 fn test_transform_sample() {
899 let x = array![[1.0, 2.0], [3.0, 4.0]];
900 let mut sampler = StreamingRBFSampler::new(10, 0.1).with_random_state(42);
901
902 sampler.fit(&x).unwrap();
903
904 let sample = array![5.0, 6.0];
905 let features = sampler.transform_sample(&sample).unwrap();
906
907 assert_eq!(features.len(), 10);
908 }
909
910 #[test]
911 fn test_streaming_config() {
912 let config = StreamingConfig {
913 buffer_strategy: BufferStrategy::SlidingWindow {
914 size: 100,
915 time_window: 10.0,
916 },
917 update_frequency: UpdateFrequency::BatchSize(50),
918 forgetting_mechanism: ForgettingMechanism::LinearDecay(0.01),
919 adaptive_components: true,
920 ..Default::default()
921 };
922
923 assert!(matches!(
924 config.buffer_strategy,
925 BufferStrategy::SlidingWindow { .. }
926 ));
927 assert!(matches!(
928 config.update_frequency,
929 UpdateFrequency::BatchSize(50)
930 ));
931 assert!(config.adaptive_components);
932 }
933
934 #[test]
935 fn test_online_updates() {
936 let mut sampler = StreamingRBFSampler::new(20, 0.1)
937 .with_config(StreamingConfig {
938 update_frequency: UpdateFrequency::BatchSize(2),
939 ..Default::default()
940 })
941 .with_random_state(42);
942
943 let x_init = array![[1.0, 2.0], [3.0, 4.0]];
945 sampler.fit(&x_init).unwrap();
946
947 for i in 5..10 {
949 let data = array![i as f64, (i + 1) as f64];
950 let sample = StreamingSample::new(data, i as f64);
951 sampler.add_sample(sample).unwrap();
952 }
953
954 let (buffer_size, _, _) = sampler.buffer_stats();
955 assert!(buffer_size > 0);
956 }
957
958 #[test]
959 fn test_drift_detection() {
960 let x1 = array![[1.0, 2.0], [1.1, 2.1], [0.9, 1.9]];
961 let x2 = array![[5.0, 6.0], [5.1, 6.1], [4.9, 5.9]]; let mut sampler = StreamingRBFSampler::new(20, 0.1)
964 .with_config(StreamingConfig {
965 drift_detection: true,
966 ..Default::default()
967 })
968 .with_random_state(42);
969
970 sampler.fit(&x1).unwrap();
971 let _drift_detected = sampler.detect_drift(&x2).unwrap();
972
973 assert!(sampler.feature_stats().drift_score >= 0.0);
975 }
976}