1use scirs2_core::ndarray::{s, Array1, Array2, Axis};
2use scirs2_core::random::rngs::StdRng;
3use scirs2_core::random::{thread_rng, Rng, SeedableRng, StandardNormal};
4use sklears_core::error::{Result, SklearsError};
5use std::collections::VecDeque;
6
7#[derive(Debug, Clone)]
15pub enum BufferStrategy {
16 FixedSize(usize),
18 SlidingWindow { size: usize, time_window: f64 },
20 ReservoirSampling(usize),
22 ExponentialDecay { alpha: f64, min_weight: f64 },
24 ImportanceWeighted { capacity: usize, threshold: f64 },
26}
27
28#[derive(Debug, Clone)]
30pub enum UpdateFrequency {
31 PerSample,
33 BatchSize(usize),
35 TimeInterval(f64),
37 ErrorThreshold(f64),
39 Adaptive {
41 initial: usize,
42 max: usize,
43 min: usize,
44 },
45}
46
47#[derive(Debug, Clone)]
49pub enum ForgettingMechanism {
50 None,
52 LinearDecay(f64),
54 ExponentialDecay(f64),
56 AbruptForgetting(f64),
58 SigmoidDecay { steepness: f64, midpoint: f64 },
60}
61
62#[derive(Debug, Clone)]
64pub struct StreamingConfig {
65 pub buffer_strategy: BufferStrategy,
67 pub update_frequency: UpdateFrequency,
69 pub forgetting_mechanism: ForgettingMechanism,
71 pub max_memory_mb: Option<usize>,
73 pub adaptive_components: bool,
75 pub quality_monitoring: bool,
77 pub drift_detection: bool,
79 pub concept_drift_threshold: f64,
81}
82
83impl Default for StreamingConfig {
84 fn default() -> Self {
85 Self {
86 buffer_strategy: BufferStrategy::FixedSize(1000),
87 update_frequency: UpdateFrequency::BatchSize(100),
88 forgetting_mechanism: ForgettingMechanism::ExponentialDecay(0.99),
89 max_memory_mb: Some(100),
90 adaptive_components: true,
91 quality_monitoring: true,
92 drift_detection: false,
93 concept_drift_threshold: 0.1,
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
100pub struct StreamingSample {
101 pub data: Array1<f64>,
103 pub timestamp: f64,
105 pub weight: f64,
107 pub importance: f64,
109 pub label: Option<f64>,
111}
112
113impl StreamingSample {
114 pub fn new(data: Array1<f64>, timestamp: f64) -> Self {
115 Self {
116 data,
117 timestamp,
118 weight: 1.0,
119 importance: 1.0,
120 label: None,
121 }
122 }
123
124 pub fn with_weight(mut self, weight: f64) -> Self {
125 self.weight = weight;
126 self
127 }
128
129 pub fn with_importance(mut self, importance: f64) -> Self {
130 self.importance = importance;
131 self
132 }
133
134 pub fn with_label(mut self, label: f64) -> Self {
135 self.label = Some(label);
136 self
137 }
138}
139
140pub struct StreamingRBFSampler {
145 n_components: usize,
146 gamma: f64,
147 config: StreamingConfig,
148 weights: Option<Array2<f64>>,
149 bias: Option<Array1<f64>>,
150 buffer: VecDeque<StreamingSample>,
151 sample_count: usize,
152 last_update: usize,
153 feature_statistics: FeatureStatistics,
154 random_state: Option<u64>,
155 rng: StdRng,
156}
157
158#[derive(Debug, Clone)]
160pub struct FeatureStatistics {
161 pub mean: Array1<f64>,
163 pub variance: Array1<f64>,
165 pub min: Array1<f64>,
167 pub max: Array1<f64>,
169 pub update_count: usize,
171 pub approximation_error: f64,
173 pub drift_score: f64,
175}
176
177impl FeatureStatistics {
178 pub fn new(n_components: usize) -> Self {
179 Self {
180 mean: Array1::zeros(n_components),
181 variance: Array1::zeros(n_components),
182 min: Array1::from_elem(n_components, f64::INFINITY),
183 max: Array1::from_elem(n_components, f64::NEG_INFINITY),
184 update_count: 0,
185 approximation_error: 0.0,
186 drift_score: 0.0,
187 }
188 }
189
190 pub fn update(&mut self, features: &Array2<f64>) {
191 let n_samples = features.nrows();
192
193 for i in 0..features.ncols() {
194 let col = features.column(i);
195 let new_mean = col.mean().unwrap_or(0.0);
196 let new_var = col.mapv(|x| (x - new_mean).powi(2)).mean().unwrap_or(0.0);
197 let new_min = col.iter().fold(f64::INFINITY, |a, &b| a.min(b));
198 let new_max = col.iter().fold(f64::NEG_INFINITY, |a, &b| a.max(b));
199
200 let old_count = self.update_count;
202 let new_count = old_count + n_samples;
203
204 if old_count == 0 {
205 self.mean[i] = new_mean;
206 self.variance[i] = new_var;
207 } else {
208 let alpha = n_samples as f64 / new_count as f64;
209 self.mean[i] = (1.0 - alpha) * self.mean[i] + alpha * new_mean;
210 self.variance[i] = (1.0 - alpha) * self.variance[i] + alpha * new_var;
211 }
212
213 self.min[i] = self.min[i].min(new_min);
214 self.max[i] = self.max[i].max(new_max);
215 }
216
217 self.update_count += n_samples;
218 }
219
220 pub fn detect_drift(&mut self, new_features: &Array2<f64>) -> bool {
221 let old_mean = self.mean.clone();
222 self.update(new_features);
223
224 let mean_shift = (&self.mean - &old_mean).mapv(f64::abs).sum();
226 self.drift_score = mean_shift / self.mean.len() as f64;
227
228 self.drift_score > 0.1 }
230}
231
232impl StreamingRBFSampler {
233 pub fn new(n_components: usize, gamma: f64) -> Self {
235 let rng = StdRng::from_seed(thread_rng().gen());
236 Self {
237 n_components,
238 gamma,
239 config: StreamingConfig::default(),
240 weights: None,
241 bias: None,
242 buffer: VecDeque::new(),
243 sample_count: 0,
244 last_update: 0,
245 feature_statistics: FeatureStatistics::new(n_components),
246 random_state: None,
247 rng,
248 }
249 }
250
251 pub fn with_config(mut self, config: StreamingConfig) -> Self {
253 self.config = config;
254 self
255 }
256
257 pub fn with_random_state(mut self, random_state: u64) -> Self {
259 self.random_state = Some(random_state);
260 self.rng = StdRng::seed_from_u64(random_state);
261 self
262 }
263
264 pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
266 let (_, n_features) = x.dim();
267
268 self.weights = Some(self.generate_weights(n_features)?);
270 self.bias = Some(self.generate_bias()?);
271
272 for (i, row) in x.rows().into_iter().enumerate() {
274 let sample = StreamingSample::new(row.to_owned(), i as f64);
275 self.add_sample(sample)?;
276 }
277
278 Ok(())
279 }
280
281 pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
283 if self.weights.is_none() {
285 let n_features = sample.data.len();
286 self.weights = Some(self.generate_weights(n_features)?);
287 self.bias = Some(self.generate_bias()?);
288 }
289
290 self.manage_buffer(sample)?;
292
293 self.sample_count += 1;
294
295 if self.should_update()? {
297 self.update_model()?;
298 self.last_update = self.sample_count;
299 }
300
301 Ok(())
302 }
303
304 pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
306 let weights = self
307 .weights
308 .as_ref()
309 .ok_or_else(|| SklearsError::NotFitted {
310 operation: "transform".to_string(),
311 })?;
312 let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
313 operation: "transform".to_string(),
314 })?;
315
316 self.compute_features(x, weights, bias)
317 }
318
319 pub fn transform_sample(&self, sample: &Array1<f64>) -> Result<Array1<f64>> {
321 let weights = self
322 .weights
323 .as_ref()
324 .ok_or_else(|| SklearsError::NotFitted {
325 operation: "transform_sample".to_string(),
326 })?;
327 let bias = self.bias.as_ref().ok_or_else(|| SklearsError::NotFitted {
328 operation: "transform_sample".to_string(),
329 })?;
330
331 let projection = sample.dot(&weights.t()) + bias;
333 let norm_factor = (2.0 / self.n_components as f64).sqrt();
334
335 Ok(projection.mapv(|x| norm_factor * x.cos()))
336 }
337
338 pub fn buffer_stats(&self) -> (usize, f64, f64) {
340 let size = self.buffer.len();
341 let avg_weight = if size > 0 {
342 self.buffer.iter().map(|s| s.weight).sum::<f64>() / size as f64
343 } else {
344 0.0
345 };
346 let avg_importance = if size > 0 {
347 self.buffer.iter().map(|s| s.importance).sum::<f64>() / size as f64
348 } else {
349 0.0
350 };
351
352 (size, avg_weight, avg_importance)
353 }
354
355 pub fn feature_stats(&self) -> &FeatureStatistics {
357 &self.feature_statistics
358 }
359
360 pub fn detect_drift(&mut self, x: &Array2<f64>) -> Result<bool> {
362 if !self.config.drift_detection {
363 return Ok(false);
364 }
365
366 let features = self.transform(x)?;
367 Ok(self.feature_statistics.detect_drift(&features))
368 }
369
370 fn manage_buffer(&mut self, sample: StreamingSample) -> Result<()> {
372 match &self.config.buffer_strategy {
373 BufferStrategy::FixedSize(max_size) => {
374 if self.buffer.len() >= *max_size {
375 self.buffer.pop_front();
376 }
377 self.buffer.push_back(sample);
378 }
379 BufferStrategy::SlidingWindow { size, time_window } => {
380 let current_time = sample.timestamp;
382 while let Some(front) = self.buffer.front() {
383 if current_time - front.timestamp > *time_window {
384 self.buffer.pop_front();
385 } else {
386 break;
387 }
388 }
389
390 if self.buffer.len() >= *size {
392 self.buffer.pop_front();
393 }
394 self.buffer.push_back(sample);
395 }
396 BufferStrategy::ReservoirSampling(capacity) => {
397 if self.buffer.len() < *capacity {
398 self.buffer.push_back(sample);
399 } else {
400 let replace_idx = self.rng.gen_range(0..=self.sample_count);
401 if replace_idx < *capacity {
402 self.buffer[replace_idx] = sample;
403 }
404 }
405 }
406 BufferStrategy::ExponentialDecay { alpha, min_weight } => {
407 for existing_sample in &mut self.buffer {
409 existing_sample.weight *= alpha;
410 }
411
412 self.buffer.retain(|s| s.weight >= *min_weight);
414
415 self.buffer.push_back(sample);
416 }
417 BufferStrategy::ImportanceWeighted {
418 capacity,
419 threshold,
420 } => {
421 if self.buffer.len() < *capacity {
422 self.buffer.push_back(sample);
423 } else {
424 if let Some((min_idx, _)) =
426 self.buffer.iter().enumerate().min_by(|(_, a), (_, b)| {
427 a.importance.partial_cmp(&b.importance).unwrap()
428 })
429 {
430 if sample.importance > self.buffer[min_idx].importance + threshold {
431 self.buffer[min_idx] = sample;
432 }
433 }
434 }
435 }
436 }
437
438 Ok(())
439 }
440
441 fn should_update(&self) -> Result<bool> {
443 match &self.config.update_frequency {
444 UpdateFrequency::PerSample => Ok(true),
445 UpdateFrequency::BatchSize(batch_size) => {
446 Ok(self.sample_count - self.last_update >= *batch_size)
447 }
448 UpdateFrequency::TimeInterval(_time_interval) => {
449 Ok(self.sample_count - self.last_update >= 100)
451 }
452 UpdateFrequency::ErrorThreshold(_threshold) => {
453 Ok(self.sample_count - self.last_update >= 50)
455 }
456 UpdateFrequency::Adaptive {
457 initial,
458 max: _,
459 min: _,
460 } => Ok(self.sample_count - self.last_update >= *initial),
461 }
462 }
463
464 fn update_model(&mut self) -> Result<()> {
466 if self.buffer.is_empty() {
467 return Ok(());
468 }
469
470 let mut data_matrix = Array2::zeros((self.buffer.len(), self.buffer[0].data.len()));
472 for (i, sample) in self.buffer.iter().enumerate() {
473 data_matrix.row_mut(i).assign(&sample.data);
474 }
475
476 let weights = self.weights.as_ref().unwrap();
478 let bias = self.bias.as_ref().unwrap();
479 let features = self.compute_features(&data_matrix, weights, bias)?;
480
481 self.feature_statistics.update(&features);
482
483 self.apply_forgetting()?;
485
486 Ok(())
487 }
488
489 fn apply_forgetting(&mut self) -> Result<()> {
491 match &self.config.forgetting_mechanism {
492 ForgettingMechanism::None => {
493 }
495 ForgettingMechanism::LinearDecay(decay_rate) => {
496 for sample in &mut self.buffer {
497 sample.weight *= 1.0 - decay_rate;
498 }
499 }
500 ForgettingMechanism::ExponentialDecay(decay_rate) => {
501 for sample in &mut self.buffer {
502 sample.weight *= decay_rate;
503 }
504 }
505 ForgettingMechanism::AbruptForgetting(time_threshold) => {
506 if let Some(newest) = self.buffer.back() {
507 let cutoff_time = newest.timestamp - time_threshold;
508 self.buffer.retain(|s| s.timestamp >= cutoff_time);
509 }
510 }
511 ForgettingMechanism::SigmoidDecay {
512 steepness,
513 midpoint,
514 } => {
515 if let Some(newest_timestamp) = self.buffer.back().map(|s| s.timestamp) {
516 for sample in &mut self.buffer {
517 let age = newest_timestamp - sample.timestamp;
518 let sigmoid_weight = 1.0 / (1.0 + (steepness * (age - midpoint)).exp());
519 sample.weight *= sigmoid_weight;
520 }
521 }
522 }
523 }
524
525 Ok(())
526 }
527
528 fn generate_weights(&mut self, n_features: usize) -> Result<Array2<f64>> {
530 let mut weights = Array2::zeros((self.n_components, n_features));
531
532 for i in 0..self.n_components {
533 for j in 0..n_features {
534 weights[[i, j]] =
535 self.rng.sample::<f64, _>(StandardNormal) * (2.0 * self.gamma).sqrt();
536 }
537 }
538
539 Ok(weights)
540 }
541
542 fn generate_bias(&mut self) -> Result<Array1<f64>> {
544 let mut bias = Array1::zeros(self.n_components);
545
546 for i in 0..self.n_components {
547 bias[i] = self.rng.gen_range(0.0..2.0 * std::f64::consts::PI);
548 }
549
550 Ok(bias)
551 }
552
553 fn compute_features(
555 &self,
556 x: &Array2<f64>,
557 weights: &Array2<f64>,
558 bias: &Array1<f64>,
559 ) -> Result<Array2<f64>> {
560 let (n_samples, _) = x.dim();
561 let n_components = weights.nrows();
562
563 let projection = x.dot(&weights.t()) + bias;
565
566 let mut features = Array2::zeros((n_samples, n_components));
568 let norm_factor = (2.0 / n_components as f64).sqrt();
569
570 for i in 0..n_samples {
571 for j in 0..n_components {
572 features[[i, j]] = norm_factor * projection[[i, j]].cos();
573 }
574 }
575
576 Ok(features)
577 }
578}
579
580pub struct StreamingNystroem {
585 n_components: usize,
586 gamma: f64,
587 config: StreamingConfig,
588 inducing_points: Option<Array2<f64>>,
589 eigenvalues: Option<Array1<f64>>,
590 eigenvectors: Option<Array2<f64>>,
591 buffer: VecDeque<StreamingSample>,
592 sample_count: usize,
593 last_update: usize,
594 random_state: Option<u64>,
595 rng: StdRng,
596}
597
598impl StreamingNystroem {
599 pub fn new(n_components: usize, gamma: f64) -> Self {
601 let rng = StdRng::from_seed(thread_rng().gen());
602 Self {
603 n_components,
604 gamma,
605 config: StreamingConfig::default(),
606 inducing_points: None,
607 eigenvalues: None,
608 eigenvectors: None,
609 buffer: VecDeque::new(),
610 sample_count: 0,
611 last_update: 0,
612 random_state: None,
613 rng,
614 }
615 }
616
617 pub fn with_config(mut self, config: StreamingConfig) -> Self {
619 self.config = config;
620 self
621 }
622
623 pub fn with_random_state(mut self, random_state: u64) -> Self {
625 self.random_state = Some(random_state);
626 self.rng = StdRng::seed_from_u64(random_state);
627 self
628 }
629
630 pub fn fit(&mut self, x: &Array2<f64>) -> Result<()> {
632 let inducing_indices = self.select_inducing_points(x)?;
634 let inducing_points = x.select(Axis(0), &inducing_indices);
635
636 let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
638 let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
639
640 self.inducing_points = Some(inducing_points);
641 self.eigenvalues = Some(eigenvalues);
642 self.eigenvectors = Some(eigenvectors);
643
644 for (i, row) in x.rows().into_iter().enumerate() {
646 let sample = StreamingSample::new(row.to_owned(), i as f64);
647 self.buffer.push_back(sample);
648 }
649
650 self.sample_count = x.nrows();
651
652 Ok(())
653 }
654
655 pub fn add_sample(&mut self, sample: StreamingSample) -> Result<()> {
657 self.buffer.push_back(sample);
658 self.sample_count += 1;
659
660 match &self.config.buffer_strategy {
662 BufferStrategy::FixedSize(max_size) => {
663 if self.buffer.len() > *max_size {
664 self.buffer.pop_front();
665 }
666 }
667 _ => {
668 }
670 }
671
672 if self.should_update()? {
674 self.update_model()?;
675 self.last_update = self.sample_count;
676 }
677
678 Ok(())
679 }
680
681 pub fn transform(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
683 let inducing_points =
684 self.inducing_points
685 .as_ref()
686 .ok_or_else(|| SklearsError::NotFitted {
687 operation: "transform".to_string(),
688 })?;
689 let eigenvalues = self
690 .eigenvalues
691 .as_ref()
692 .ok_or_else(|| SklearsError::NotFitted {
693 operation: "transform".to_string(),
694 })?;
695 let eigenvectors = self
696 .eigenvectors
697 .as_ref()
698 .ok_or_else(|| SklearsError::NotFitted {
699 operation: "transform".to_string(),
700 })?;
701
702 let kernel_x_inducing = self.compute_kernel(x, inducing_points)?;
704
705 let mut features = kernel_x_inducing.dot(eigenvectors);
707
708 for i in 0..eigenvalues.len() {
710 if eigenvalues[i] > 1e-12 {
711 let scale = 1.0 / eigenvalues[i].sqrt();
712 for j in 0..features.nrows() {
713 features[[j, i]] *= scale;
714 }
715 }
716 }
717
718 Ok(features)
719 }
720
721 fn should_update(&self) -> Result<bool> {
723 Ok(self.sample_count - self.last_update >= 100)
725 }
726
727 fn update_model(&mut self) -> Result<()> {
729 if self.buffer.is_empty() {
730 return Ok(());
731 }
732
733 let n_samples = self.buffer.len();
735 let n_features = self.buffer[0].data.len();
736 let mut data_matrix = Array2::zeros((n_samples, n_features));
737
738 for (i, sample) in self.buffer.iter().enumerate() {
739 data_matrix.row_mut(i).assign(&sample.data);
740 }
741
742 let inducing_indices = self.select_inducing_points(&data_matrix)?;
744 let inducing_points = data_matrix.select(Axis(0), &inducing_indices);
745
746 let kernel_matrix = self.compute_kernel_matrix(&inducing_points)?;
748 let (eigenvalues, eigenvectors) = self.eigendecomposition(&kernel_matrix)?;
749
750 self.inducing_points = Some(inducing_points);
751 self.eigenvalues = Some(eigenvalues);
752 self.eigenvectors = Some(eigenvectors);
753
754 Ok(())
755 }
756
757 fn select_inducing_points(&mut self, x: &Array2<f64>) -> Result<Vec<usize>> {
759 let n_samples = x.nrows();
760 let n_inducing = self.n_components.min(n_samples);
761
762 let mut indices = Vec::new();
763 for _ in 0..n_inducing {
764 indices.push(self.rng.gen_range(0..n_samples));
765 }
766
767 Ok(indices)
768 }
769
770 fn compute_kernel_matrix(&self, x: &Array2<f64>) -> Result<Array2<f64>> {
772 let n_samples = x.nrows();
773 let mut kernel_matrix = Array2::zeros((n_samples, n_samples));
774
775 for i in 0..n_samples {
776 for j in i..n_samples {
777 let diff = &x.row(i) - &x.row(j);
778 let squared_dist = diff.mapv(|x| x * x).sum();
779 let kernel_val = (-self.gamma * squared_dist).exp();
780 kernel_matrix[[i, j]] = kernel_val;
781 kernel_matrix[[j, i]] = kernel_val;
782 }
783 }
784
785 Ok(kernel_matrix)
786 }
787
788 fn compute_kernel(&self, x: &Array2<f64>, y: &Array2<f64>) -> Result<Array2<f64>> {
790 let (n_samples_x, _) = x.dim();
791 let (n_samples_y, _) = y.dim();
792 let mut kernel_matrix = Array2::zeros((n_samples_x, n_samples_y));
793
794 for i in 0..n_samples_x {
795 for j in 0..n_samples_y {
796 let diff = &x.row(i) - &y.row(j);
797 let squared_dist = diff.mapv(|x| x * x).sum();
798 let kernel_val = (-self.gamma * squared_dist).exp();
799 kernel_matrix[[i, j]] = kernel_val;
800 }
801 }
802
803 Ok(kernel_matrix)
804 }
805
806 fn eigendecomposition(&self, matrix: &Array2<f64>) -> Result<(Array1<f64>, Array2<f64>)> {
808 let n = matrix.nrows();
809 let eigenvalues = Array1::ones(self.n_components.min(n));
810 let eigenvectors = Array2::eye(n)
811 .slice(s![.., ..self.n_components.min(n)])
812 .to_owned();
813
814 Ok((eigenvalues, eigenvectors))
815 }
816}
817
818#[allow(non_snake_case)]
819#[cfg(test)]
820mod tests {
821 use super::*;
822 use scirs2_core::ndarray::array;
823
824 #[test]
825 fn test_streaming_rbf_sampler_basic() {
826 let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
827
828 let mut sampler = StreamingRBFSampler::new(50, 0.1).with_random_state(42);
829
830 sampler.fit(&x).unwrap();
831 let features = sampler.transform(&x).unwrap();
832
833 assert_eq!(features.nrows(), 4);
834 assert_eq!(features.ncols(), 50);
835 }
836
837 #[test]
838 fn test_streaming_sample() {
839 let data = array![1.0, 2.0, 3.0];
840 let sample = StreamingSample::new(data.clone(), 1.0)
841 .with_weight(0.8)
842 .with_importance(0.9)
843 .with_label(1.0);
844
845 assert_eq!(sample.data, data);
846 assert_eq!(sample.timestamp, 1.0);
847 assert_eq!(sample.weight, 0.8);
848 assert_eq!(sample.importance, 0.9);
849 assert_eq!(sample.label, Some(1.0));
850 }
851
852 #[test]
853 fn test_buffer_strategies() {
854 let mut sampler = StreamingRBFSampler::new(10, 0.1).with_config(StreamingConfig {
855 buffer_strategy: BufferStrategy::FixedSize(3),
856 ..Default::default()
857 });
858
859 for i in 0..5 {
861 let data = array![i as f64, (i + 1) as f64];
862 let sample = StreamingSample::new(data, i as f64);
863 sampler.add_sample(sample).unwrap();
864 }
865
866 let (size, _, _) = sampler.buffer_stats();
867 assert_eq!(size, 3); }
869
870 #[test]
871 fn test_streaming_nystroem_basic() {
872 let x = array![[1.0, 2.0], [3.0, 4.0], [5.0, 6.0], [7.0, 8.0]];
873
874 let mut nystroem = StreamingNystroem::new(3, 0.1).with_random_state(42);
875
876 nystroem.fit(&x).unwrap();
877 let features = nystroem.transform(&x).unwrap();
878
879 assert_eq!(features.nrows(), 4);
880 assert_eq!(features.ncols(), 3);
881 }
882
883 #[test]
884 fn test_feature_statistics() {
885 let mut stats = FeatureStatistics::new(3);
886 let features = array![[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]];
887
888 stats.update(&features);
889
890 assert_eq!(stats.update_count, 2);
891 assert!((stats.mean[0] - 2.5).abs() < 1e-10);
892 assert!((stats.mean[1] - 3.5).abs() < 1e-10);
893 assert!((stats.mean[2] - 4.5).abs() < 1e-10);
894 }
895
896 #[test]
897 fn test_transform_sample() {
898 let x = array![[1.0, 2.0], [3.0, 4.0]];
899 let mut sampler = StreamingRBFSampler::new(10, 0.1).with_random_state(42);
900
901 sampler.fit(&x).unwrap();
902
903 let sample = array![5.0, 6.0];
904 let features = sampler.transform_sample(&sample).unwrap();
905
906 assert_eq!(features.len(), 10);
907 }
908
909 #[test]
910 fn test_streaming_config() {
911 let config = StreamingConfig {
912 buffer_strategy: BufferStrategy::SlidingWindow {
913 size: 100,
914 time_window: 10.0,
915 },
916 update_frequency: UpdateFrequency::BatchSize(50),
917 forgetting_mechanism: ForgettingMechanism::LinearDecay(0.01),
918 adaptive_components: true,
919 ..Default::default()
920 };
921
922 assert!(matches!(
923 config.buffer_strategy,
924 BufferStrategy::SlidingWindow { .. }
925 ));
926 assert!(matches!(
927 config.update_frequency,
928 UpdateFrequency::BatchSize(50)
929 ));
930 assert!(config.adaptive_components);
931 }
932
933 #[test]
934 fn test_online_updates() {
935 let mut sampler = StreamingRBFSampler::new(20, 0.1)
936 .with_config(StreamingConfig {
937 update_frequency: UpdateFrequency::BatchSize(2),
938 ..Default::default()
939 })
940 .with_random_state(42);
941
942 let x_init = array![[1.0, 2.0], [3.0, 4.0]];
944 sampler.fit(&x_init).unwrap();
945
946 for i in 5..10 {
948 let data = array![i as f64, (i + 1) as f64];
949 let sample = StreamingSample::new(data, i as f64);
950 sampler.add_sample(sample).unwrap();
951 }
952
953 let (buffer_size, _, _) = sampler.buffer_stats();
954 assert!(buffer_size > 0);
955 }
956
957 #[test]
958 fn test_drift_detection() {
959 let x1 = array![[1.0, 2.0], [1.1, 2.1], [0.9, 1.9]];
960 let x2 = array![[5.0, 6.0], [5.1, 6.1], [4.9, 5.9]]; let mut sampler = StreamingRBFSampler::new(20, 0.1)
963 .with_config(StreamingConfig {
964 drift_detection: true,
965 ..Default::default()
966 })
967 .with_random_state(42);
968
969 sampler.fit(&x1).unwrap();
970 let drift_detected = sampler.detect_drift(&x2).unwrap();
971
972 assert!(sampler.feature_stats().drift_score >= 0.0);
974 }
975}