1use scirs2_core::ndarray::{Array1, Array2, Axis};
7use sklears_core::error::{Result, SklearsError};
8use sklears_core::prelude::Predict;
9use sklears_core::traits::{Estimator, Fit, Trained, Untrained};
10use sklears_core::types::Float;
11use std::collections::VecDeque;
12use std::marker::PhantomData;
13
14#[cfg(feature = "parallel")]
15use rayon::prelude::*;
16
17#[derive(Debug, Clone)]
19pub struct StreamingConfig {
20 pub max_models: usize,
22 pub performance_window_size: usize,
24 pub drift_threshold: Float,
26 pub weight_learning_rate: Float,
28 pub forgetting_factor: Float,
30 pub enable_drift_detection: bool,
32 pub min_samples_for_drift: usize,
34 pub grace_period: usize,
36 pub adaptive_ensemble_size: bool,
38 pub bootstrap_ratio: Float,
40 pub random_state: Option<u64>,
42}
43
44impl Default for StreamingConfig {
45 fn default() -> Self {
46 Self {
47 max_models: 10,
48 performance_window_size: 1000,
49 drift_threshold: 0.05,
50 weight_learning_rate: 0.01,
51 forgetting_factor: 0.99,
52 enable_drift_detection: true,
53 min_samples_for_drift: 100,
54 grace_period: 50,
55 adaptive_ensemble_size: true,
56 bootstrap_ratio: 0.8,
57 random_state: None,
58 }
59 }
60}
61
62#[derive(Debug, Clone, Copy, PartialEq)]
64pub enum DriftDetectionMethod {
65 ADWIN,
67 PageHinkley,
69 DDM,
71 EDDM,
73 ErrorRate,
75}
76
77#[derive(Debug, Clone)]
79pub struct ConceptDriftDetector {
80 method: DriftDetectionMethod,
81 threshold: Float,
82 window: VecDeque<Float>,
83 error_sum: Float,
84 error_count: usize,
85 min_length: usize,
86 drift_detected: bool,
87 warning_detected: bool,
88 adwin_delta: Float,
90 ph_min_threshold: Float,
92 ph_threshold: Float,
93 ph_alpha: Float,
94 ph_sum: Float,
95 ph_min_sum: Float,
96 p_min: Float,
98 s_min: Float,
99 warning_level: Float,
100 out_control_level: Float,
101}
102
103impl ConceptDriftDetector {
104 pub fn new(method: DriftDetectionMethod, threshold: Float) -> Self {
106 Self {
107 method,
108 threshold,
109 window: VecDeque::new(),
110 error_sum: 0.0,
111 error_count: 0,
112 min_length: 30,
113 drift_detected: false,
114 warning_detected: false,
115 adwin_delta: 0.002,
116 ph_min_threshold: 50.0,
117 ph_threshold: 50.0,
118 ph_alpha: 0.9999,
119 ph_sum: 0.0,
120 ph_min_sum: Float::INFINITY,
121 p_min: Float::INFINITY,
122 s_min: Float::INFINITY,
123 warning_level: 2.0,
124 out_control_level: 3.0,
125 }
126 }
127
128 pub fn update(&mut self, error: Float) -> (bool, bool) {
130 self.drift_detected = false;
131 self.warning_detected = false;
132
133 match self.method {
134 DriftDetectionMethod::ADWIN => self.update_adwin(error),
135 DriftDetectionMethod::PageHinkley => self.update_page_hinkley(error),
136 DriftDetectionMethod::DDM => self.update_ddm(error),
137 DriftDetectionMethod::EDDM => self.update_eddm(error),
138 DriftDetectionMethod::ErrorRate => self.update_error_rate(error),
139 }
140
141 (self.drift_detected, self.warning_detected)
142 }
143
144 fn update_adwin(&mut self, error: Float) {
145 self.window.push_back(error);
146
147 if self.window.len() < self.min_length {
148 return;
149 }
150
151 let n = self.window.len();
153 let total_sum: Float = self.window.iter().sum();
154 let total_mean = total_sum / n as Float;
155
156 for i in self.min_length..n - self.min_length {
158 let left_sum: Float = self.window.iter().take(i).sum();
159 let right_sum = total_sum - left_sum;
160
161 let left_mean = left_sum / i as Float;
162 let right_mean = right_sum / (n - i) as Float;
163
164 let diff = (left_mean - right_mean).abs();
165
166 let threshold = (2.0 * (2.0 / self.adwin_delta).ln() / i as Float).sqrt();
168
169 if diff > threshold {
170 self.drift_detected = true;
171 for _ in 0..i {
173 self.window.pop_front();
174 }
175 break;
176 }
177 }
178 }
179
180 fn update_page_hinkley(&mut self, error: Float) {
181 let target_mean = 0.5; self.ph_sum += (error - target_mean) - self.ph_alpha;
184
185 if self.ph_sum < self.ph_min_sum {
186 self.ph_min_sum = self.ph_sum;
187 }
188
189 let test_statistic = self.ph_sum - self.ph_min_sum;
190
191 if test_statistic > self.ph_threshold {
192 self.drift_detected = true;
193 self.ph_sum = 0.0;
194 self.ph_min_sum = Float::INFINITY;
195 }
196 }
197
198 fn update_ddm(&mut self, error: Float) {
199 self.error_count += 1;
200 self.error_sum += error;
201
202 if self.error_count < self.min_length {
203 return;
204 }
205
206 let p = self.error_sum / self.error_count as Float;
207 let s = (p * (1.0 - p) / self.error_count as Float).sqrt();
208
209 if self.p_min == Float::INFINITY || (p + s) < (self.p_min + self.s_min) {
210 self.p_min = p;
211 self.s_min = s;
212 }
213
214 if p + s > self.p_min + self.out_control_level * self.s_min {
215 self.drift_detected = true;
216 self.reset_ddm();
217 } else if p + s > self.p_min + self.warning_level * self.s_min {
218 self.warning_detected = true;
219 }
220 }
221
222 fn update_eddm(&mut self, error: Float) {
223 self.window.push_back(error);
225
226 if self.window.len() > 1000 {
227 self.window.pop_front();
228 }
229
230 if self.window.len() < self.min_length {
231 return;
232 }
233
234 let mut distances = Vec::new();
236 let mut last_error_pos = None;
237
238 for (i, &val) in self.window.iter().enumerate() {
239 if val > 0.5 {
240 if let Some(last_pos) = last_error_pos {
242 distances.push((i - last_pos) as Float);
243 }
244 last_error_pos = Some(i);
245 }
246 }
247
248 if distances.len() >= 2 {
249 let mean_distance: Float = distances.iter().sum::<Float>() / distances.len() as Float;
250 let std_distance = (distances
251 .iter()
252 .map(|&d| (d - mean_distance).powi(2))
253 .sum::<Float>()
254 / distances.len() as Float)
255 .sqrt();
256
257 let recent_distances: Vec<Float> = distances.iter().rev().take(5).cloned().collect();
259 if !recent_distances.is_empty() {
260 let recent_mean: Float =
261 recent_distances.iter().sum::<Float>() / recent_distances.len() as Float;
262
263 if recent_mean < mean_distance - 2.0 * std_distance {
264 self.drift_detected = true;
265 }
266 }
267 }
268 }
269
270 fn update_error_rate(&mut self, error: Float) {
271 self.window.push_back(error);
272
273 let window_size = 100;
274 if self.window.len() > window_size {
275 self.window.pop_front();
276 }
277
278 if self.window.len() < self.min_length {
279 return;
280 }
281
282 let error_rate: Float = self.window.iter().sum::<Float>() / self.window.len() as Float;
283
284 if error_rate > self.threshold {
285 self.drift_detected = true;
286 }
287 }
288
289 fn reset_ddm(&mut self) {
290 self.error_sum = 0.0;
291 self.error_count = 0;
292 self.p_min = Float::INFINITY;
293 self.s_min = Float::INFINITY;
294 }
295
296 pub fn reset(&mut self) {
298 self.window.clear();
299 self.error_sum = 0.0;
300 self.error_count = 0;
301 self.drift_detected = false;
302 self.warning_detected = false;
303 self.ph_sum = 0.0;
304 self.ph_min_sum = Float::INFINITY;
305 self.reset_ddm();
306 }
307}
308
309pub struct StreamingEnsemble<State = Untrained> {
311 config: StreamingConfig,
312 state: PhantomData<State>,
313 models_: Option<Vec<Box<dyn StreamingModel>>>,
315 model_weights_: Option<Array1<Float>>,
316 model_ages_: Option<Vec<usize>>,
317 model_performance_: Option<Vec<VecDeque<Float>>>,
318 drift_detector_: Option<ConceptDriftDetector>,
320 samples_seen_: usize,
321 drift_count_: usize,
322 last_drift_position_: usize,
323 overall_accuracy_: Float,
325 recent_predictions_: VecDeque<(Array1<Float>, Float, Float)>, }
327
328pub trait StreamingModel: Send + Sync {
330 fn partial_fit(&mut self, x: &Array1<Float>, y: Float) -> Result<()>;
332
333 fn predict(&self, x: &Array1<Float>) -> Result<Float>;
335
336 fn get_performance(&self) -> Float;
338
339 fn reset(&mut self) -> Result<()>;
341
342 fn clone_model(&self) -> Box<dyn StreamingModel>;
344}
345
346#[derive(Debug, Clone)]
348pub struct StreamingLinearRegression {
349 weights: Array1<Float>,
350 bias: Float,
351 learning_rate: Float,
352 n_samples: usize,
353 recent_errors: VecDeque<Float>,
354}
355
356impl StreamingLinearRegression {
357 pub fn new(n_features: usize, learning_rate: Float) -> Self {
358 Self {
359 weights: Array1::zeros(n_features),
360 bias: 0.0,
361 learning_rate,
362 n_samples: 0,
363 recent_errors: VecDeque::new(),
364 }
365 }
366}
367
368impl StreamingModel for StreamingLinearRegression {
369 fn partial_fit(&mut self, x: &Array1<Float>, y: Float) -> Result<()> {
370 let prediction = self.predict(x)?;
371 let error = y - prediction;
372
373 for i in 0..self.weights.len() {
375 self.weights[i] += self.learning_rate * error * x[i];
376 }
377 self.bias += self.learning_rate * error;
378
379 self.recent_errors.push_back(error.abs());
381 if self.recent_errors.len() > 100 {
382 self.recent_errors.pop_front();
383 }
384
385 self.n_samples += 1;
386 Ok(())
387 }
388
389 fn predict(&self, x: &Array1<Float>) -> Result<Float> {
390 Ok(self.weights.dot(x) + self.bias)
391 }
392
393 fn get_performance(&self) -> Float {
394 if self.recent_errors.is_empty() {
395 return 0.5; }
397
398 let mean_error: Float =
399 self.recent_errors.iter().sum::<Float>() / self.recent_errors.len() as Float;
400
401 (1.0 / (1.0 + mean_error)).min(1.0).max(0.0)
403 }
404
405 fn reset(&mut self) -> Result<()> {
406 self.weights.fill(0.0);
407 self.bias = 0.0;
408 self.n_samples = 0;
409 self.recent_errors.clear();
410 Ok(())
411 }
412
413 fn clone_model(&self) -> Box<dyn StreamingModel> {
414 Box::new(self.clone())
415 }
416}
417
418impl<State> StreamingEnsemble<State> {
419 pub fn model_count(&self) -> usize {
421 self.models_.as_ref().map_or(0, |models| models.len())
422 }
423
424 pub fn drift_count(&self) -> usize {
426 self.drift_count_
427 }
428
429 pub fn overall_accuracy(&self) -> Float {
431 self.overall_accuracy_
432 }
433
434 pub fn samples_seen(&self) -> usize {
436 self.samples_seen_
437 }
438}
439
440impl StreamingEnsemble<Untrained> {
441 pub fn new() -> Self {
443 Self {
444 config: StreamingConfig::default(),
445 state: PhantomData,
446 models_: None,
447 model_weights_: None,
448 model_ages_: None,
449 model_performance_: None,
450 drift_detector_: None,
451 samples_seen_: 0,
452 drift_count_: 0,
453 last_drift_position_: 0,
454 overall_accuracy_: 0.0,
455 recent_predictions_: VecDeque::new(),
456 }
457 }
458
459 pub fn max_models(mut self, max_models: usize) -> Self {
461 self.config.max_models = max_models;
462 self
463 }
464
465 pub fn drift_threshold(mut self, threshold: Float) -> Self {
467 self.config.drift_threshold = threshold;
468 self
469 }
470
471 pub fn weight_learning_rate(mut self, rate: Float) -> Self {
473 self.config.weight_learning_rate = rate;
474 self
475 }
476
477 pub fn forgetting_factor(mut self, factor: Float) -> Self {
479 self.config.forgetting_factor = factor;
480 self
481 }
482
483 pub fn enable_drift_detection(mut self, enabled: bool) -> Self {
485 self.config.enable_drift_detection = enabled;
486 self
487 }
488
489 pub fn performance_window_size(mut self, size: usize) -> Self {
491 self.config.performance_window_size = size;
492 self
493 }
494
495 pub fn adaptive_ensemble_size(mut self, enabled: bool) -> Self {
497 self.config.adaptive_ensemble_size = enabled;
498 self
499 }
500
501 pub fn for_concept_drift() -> Self {
503 Self::new()
504 .max_models(15)
505 .drift_threshold(0.03)
506 .weight_learning_rate(0.05)
507 .forgetting_factor(0.95)
508 .enable_drift_detection(true)
509 .performance_window_size(500)
510 .adaptive_ensemble_size(true)
511 }
512
513 pub fn for_real_time() -> Self {
515 Self::new()
516 .max_models(5)
517 .drift_threshold(0.1)
518 .weight_learning_rate(0.1)
519 .forgetting_factor(0.9)
520 .performance_window_size(100)
521 .adaptive_ensemble_size(false)
522 }
523}
524
525impl StreamingEnsemble<Trained> {
526 pub fn partial_fit(&mut self, x: &Array1<Float>, y: Float) -> Result<Float> {
528 self.samples_seen_ += 1;
529
530 let prediction = if self.model_count() > 0 {
532 self.predict_single(x)?
533 } else {
534 0.0 };
536
537 let drift_detected = if let Some(detector) = &mut self.drift_detector_ {
539 let error = if (prediction - y).abs() > 0.5 {
540 1.0
541 } else {
542 0.0
543 };
544 let (drift, _warning) = detector.update(error);
545
546 if drift {
547 self.drift_count_ += 1;
548 self.last_drift_position_ = self.samples_seen_;
549 self.handle_concept_drift(x.len())?;
550 }
551
552 drift
553 } else {
554 false
555 };
556
557 if let Some(models) = &mut self.models_ {
559 for model in models.iter_mut() {
560 model.partial_fit(x, y)?;
561 }
562 }
563
564 if let Some(performance_tracking) = &mut self.model_performance_ {
566 for (i, model) in self.models_.as_ref().unwrap().iter().enumerate() {
567 let perf = model.get_performance();
568 performance_tracking[i].push_back(perf);
569
570 if performance_tracking[i].len() > self.config.performance_window_size {
571 performance_tracking[i].pop_front();
572 }
573 }
574 }
575
576 self.update_model_weights()?;
578
579 self.update_overall_accuracy(prediction, y);
581
582 self.recent_predictions_
584 .push_back((x.clone(), y, prediction));
585 if self.recent_predictions_.len() > 1000 {
586 self.recent_predictions_.pop_front();
587 }
588
589 if self.config.adaptive_ensemble_size && self.samples_seen_ % 100 == 0 {
591 self.adjust_ensemble_size(x.len())?;
592 }
593
594 if self.should_add_model() || drift_detected {
596 self.add_new_model(x.len())?;
597 }
598
599 Ok(prediction)
600 }
601
602 pub fn predict_single(&self, x: &Array1<Float>) -> Result<Float> {
604 if let Some(models) = &self.models_ {
605 if models.is_empty() {
606 return Ok(0.0);
607 }
608
609 let mut weighted_sum = 0.0;
610 let mut total_weight = 0.0;
611
612 for (i, model) in models.iter().enumerate() {
613 let prediction = model.predict(x)?;
614 let weight = self
615 .model_weights_
616 .as_ref()
617 .map(|w| w[i])
618 .unwrap_or(1.0 / models.len() as Float);
619
620 weighted_sum += prediction * weight;
621 total_weight += weight;
622 }
623
624 if total_weight > 0.0 {
625 Ok(weighted_sum / total_weight)
626 } else {
627 Ok(0.0)
628 }
629 } else {
630 Ok(0.0)
631 }
632 }
633
634 fn handle_concept_drift(&mut self, n_features: usize) -> Result<()> {
636 if let Some(models) = &mut self.models_ {
638 if let Some(performance_tracking) = &self.model_performance_ {
639 let mut performance_scores: Vec<(usize, Float)> = performance_tracking
640 .iter()
641 .enumerate()
642 .map(|(i, perf)| {
643 let avg_perf = if perf.is_empty() {
644 0.0
645 } else {
646 perf.iter().sum::<Float>() / perf.len() as Float
647 };
648 (i, avg_perf)
649 })
650 .collect();
651
652 performance_scores.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
654
655 let reset_count = (models.len() * 30 / 100).max(1);
657 for i in 0..reset_count.min(performance_scores.len()) {
658 let model_idx = performance_scores[i].0;
659 models[model_idx].reset()?;
660 }
661 }
662 }
663
664 for _ in 0..2 {
666 self.add_new_model(n_features)?;
667 }
668
669 if let Some(detector) = &mut self.drift_detector_ {
671 detector.reset();
672 }
673
674 Ok(())
675 }
676
677 fn adjust_ensemble_size(&mut self, n_features: usize) -> Result<()> {
679 let model_count = self.model_count();
680 if model_count < 2 {
681 return Ok(()); }
683
684 let performance_scores = self.calculate_model_performance_scores();
686 let diversity_scores = self.calculate_model_diversity_scores()?;
687
688 let mut utility_scores: Vec<(usize, Float)> = performance_scores
690 .iter()
691 .zip(diversity_scores.iter())
692 .enumerate()
693 .map(|(i, (&perf, &div))| {
694 let utility = 0.7 * perf + 0.3 * div;
696 (i, utility)
697 })
698 .collect();
699
700 utility_scores.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap());
702
703 let avg_performance =
705 performance_scores.iter().sum::<Float>() / performance_scores.len() as Float;
706 let current_size = model_count;
707
708 if current_size > 3
710 && (avg_performance < 0.3 || current_size > self.config.max_models * 3 / 4)
711 {
712 let worst_utility = utility_scores[0].1;
714 let avg_utility = utility_scores.iter().map(|(_, u)| u).sum::<Float>()
715 / utility_scores.len() as Float;
716
717 if worst_utility < avg_utility * 0.6 && current_size > 2 {
718 let remove_idx = utility_scores[0].0;
719 self.remove_model(remove_idx)?;
720 }
721 }
722 else if current_size < self.config.max_models && avg_performance > 0.6 {
724 let avg_diversity =
726 diversity_scores.iter().sum::<Float>() / diversity_scores.len() as Float;
727 if avg_diversity < 0.7 {
728 self.add_new_model(n_features)?;
730 }
731 }
732
733 Ok(())
734 }
735
736 fn calculate_model_performance_scores(&self) -> Vec<Float> {
738 if let Some(performance_tracking) = &self.model_performance_ {
739 performance_tracking
740 .iter()
741 .map(|perf_history| {
742 if perf_history.is_empty() {
743 0.5 } else {
745 let recent_window = perf_history.len().min(20);
747 let recent_perf: Float =
748 perf_history.iter().rev().take(recent_window).sum::<Float>();
749 recent_perf / recent_window as Float
750 }
751 })
752 .collect()
753 } else {
754 vec![0.5; self.model_count()] }
756 }
757
758 fn calculate_model_diversity_scores(&self) -> Result<Vec<Float>> {
760 let model_count = self.model_count();
761 if model_count < 2 {
762 return Ok(vec![1.0; model_count]); }
764
765 let mut diversity_scores = vec![0.0; model_count];
766 let sample_size = self.recent_predictions_.len().min(100); if sample_size > 0 {
769 for (i, model_i) in self.models_.as_ref().unwrap().iter().enumerate() {
771 let mut total_diversity = 0.0;
772 let mut comparison_count = 0;
773
774 for (j, model_j) in self.models_.as_ref().unwrap().iter().enumerate() {
775 if i != j {
776 let mut diff_sum = 0.0;
778 for (x, _, _) in self.recent_predictions_.iter().rev().take(sample_size) {
779 let pred_i = model_i.predict(x).unwrap_or(0.0);
780 let pred_j = model_j.predict(x).unwrap_or(0.0);
781 diff_sum += (pred_i - pred_j).abs();
782 }
783 total_diversity += diff_sum / sample_size as Float;
784 comparison_count += 1;
785 }
786 }
787
788 if comparison_count > 0 {
789 diversity_scores[i] = total_diversity / comparison_count as Float;
790 }
791 }
792
793 let max_diversity = diversity_scores.iter().fold(0.0f64, |a, &b| a.max(b));
795 if max_diversity > 0.0 {
796 for score in &mut diversity_scores {
797 *score /= max_diversity;
798 }
799 }
800 }
801
802 Ok(diversity_scores)
803 }
804
805 fn remove_model(&mut self, index: usize) -> Result<()> {
807 if let Some(models) = &mut self.models_ {
808 if index < models.len() && models.len() > 1 {
809 models.remove(index);
810
811 if let Some(weights) = &mut self.model_weights_ {
813 let mut new_weights = Array1::zeros(models.len());
814 let mut w_idx = 0;
815 for i in 0..weights.len() {
816 if i != index {
817 new_weights[w_idx] = weights[i];
818 w_idx += 1;
819 }
820 }
821 let weight_sum = new_weights.sum();
823 if weight_sum > 0.0 {
824 new_weights /= weight_sum;
825 }
826 *weights = new_weights;
827 }
828
829 if let Some(performance) = &mut self.model_performance_ {
831 performance.remove(index);
832 }
833
834 if let Some(ages) = &mut self.model_ages_ {
836 ages.remove(index);
837 }
838 }
839 }
840 Ok(())
841 }
842
843 fn should_add_model(&self) -> bool {
845 if let Some(models) = &self.models_ {
846 if models.len() < self.config.max_models {
848 return true;
849 }
850
851 if self.recent_predictions_.len() >= 50 {
853 let recent_errors: Vec<Float> = self
854 .recent_predictions_
855 .iter()
856 .rev()
857 .take(50)
858 .map(|(_, true_y, pred_y)| (true_y - pred_y).abs())
859 .collect();
860
861 let recent_error =
862 recent_errors.iter().sum::<Float>() / recent_errors.len() as Float;
863
864 if recent_error > 1.0 {
866 return true;
867 }
868 }
869 } else {
870 return true; }
872
873 false
874 }
875
876 fn add_new_model(&mut self, n_features: usize) -> Result<()> {
878 let new_model = Box::new(StreamingLinearRegression::new(n_features, 0.01));
879
880 if let Some(models) = &mut self.models_ {
881 models.push(new_model);
882
883 if models.len() > self.config.max_models {
885 models.remove(0);
886
887 if let Some(ages) = &mut self.model_ages_ {
889 ages.remove(0);
890 }
891 if let Some(performance) = &mut self.model_performance_ {
892 performance.remove(0);
893 }
894 }
895 } else {
896 self.models_ = Some(vec![new_model]);
897 }
898
899 if let Some(ages) = &mut self.model_ages_ {
901 ages.push(0);
902 } else {
903 self.model_ages_ = Some(vec![0]);
904 }
905
906 if let Some(performance) = &mut self.model_performance_ {
908 performance.push(VecDeque::new());
909 } else {
910 self.model_performance_ = Some(vec![VecDeque::new()]);
911 }
912
913 self.update_model_weights()?;
915
916 Ok(())
917 }
918
919 fn update_model_weights(&mut self) -> Result<()> {
921 if let Some(models) = &self.models_ {
922 let n_models = models.len();
923 if n_models == 0 {
924 return Ok(());
925 }
926
927 let mut weights = Array1::zeros(n_models);
928
929 if let Some(performance_tracking) = &self.model_performance_ {
930 for (i, perf_history) in performance_tracking.iter().enumerate() {
931 if perf_history.is_empty() {
932 weights[i] = 1.0 / n_models as Float; } else {
934 let recent_perf: Float = perf_history.iter().rev().take(10).sum::<Float>()
936 / perf_history.len().min(10) as Float;
937
938 weights[i] = recent_perf;
939 }
940 }
941
942 let weight_sum = weights.sum();
944 if weight_sum > 0.0 {
945 weights /= weight_sum;
946 } else {
947 weights.fill(1.0 / n_models as Float);
948 }
949 } else {
950 weights.fill(1.0 / n_models as Float);
951 }
952
953 self.model_weights_ = Some(weights);
954 }
955
956 Ok(())
957 }
958
959 fn update_overall_accuracy(&mut self, prediction: Float, true_value: Float) {
961 let error = (prediction - true_value).abs();
962 let accuracy = if error < 0.5 { 1.0 } else { 0.0 };
963
964 let alpha = 0.01;
966 self.overall_accuracy_ = alpha * accuracy + (1.0 - alpha) * self.overall_accuracy_;
967 }
968}
969
970pub struct AdaptiveStreamingEnsemble<State = Untrained> {
972 base_ensemble: StreamingEnsemble<State>,
973 adaptation_config: AdaptationConfig,
974 performance_history: VecDeque<Float>,
975 last_adaptation: usize,
976}
977
978#[derive(Debug, Clone)]
979pub struct AdaptationConfig {
980 pub adaptation_interval: usize,
982 pub performance_threshold: Float,
984 pub max_size_adjustment: i32,
986 pub lr_adjustment_factor: Float,
988}
989
990impl Default for AdaptationConfig {
991 fn default() -> Self {
992 Self {
993 adaptation_interval: 1000,
994 performance_threshold: 0.05,
995 max_size_adjustment: 3,
996 lr_adjustment_factor: 1.1,
997 }
998 }
999}
1000
1001impl Default for AdaptiveStreamingEnsemble<Untrained> {
1002 fn default() -> Self {
1003 Self::new()
1004 }
1005}
1006
1007impl AdaptiveStreamingEnsemble<Untrained> {
1008 pub fn new() -> Self {
1009 Self {
1010 base_ensemble: StreamingEnsemble::new(),
1011 adaptation_config: AdaptationConfig::default(),
1012 performance_history: VecDeque::new(),
1013 last_adaptation: 0,
1014 }
1015 }
1016
1017 pub fn with_base(base: StreamingEnsemble<Untrained>) -> Self {
1019 Self {
1020 base_ensemble: base,
1021 adaptation_config: AdaptationConfig::default(),
1022 performance_history: VecDeque::new(),
1023 last_adaptation: 0,
1024 }
1025 }
1026}
1027
1028impl Estimator for StreamingEnsemble<Untrained> {
1030 type Config = StreamingConfig;
1031 type Error = SklearsError;
1032 type Float = Float;
1033
1034 fn config(&self) -> &Self::Config {
1035 &self.config
1036 }
1037}
1038
1039impl Fit<Array2<Float>, Array1<Float>> for StreamingEnsemble<Untrained> {
1040 type Fitted = StreamingEnsemble<Trained>;
1041
1042 fn fit(self, x: &Array2<Float>, y: &Array1<Float>) -> Result<Self::Fitted> {
1043 let n_samples = x.nrows();
1044 let n_features = x.ncols();
1045
1046 if n_samples != y.len() {
1047 return Err(SklearsError::ShapeMismatch {
1048 expected: format!("{} samples", n_samples),
1049 actual: format!("{} samples", y.len()),
1050 });
1051 }
1052
1053 let config = self.config.clone();
1055 let mut ensemble = StreamingEnsemble::<Trained> {
1056 config: config.clone(),
1057 state: PhantomData,
1058 models_: Some(Vec::new()),
1059 model_weights_: None,
1060 model_ages_: Some(Vec::new()),
1061 model_performance_: Some(Vec::new()),
1062 drift_detector_: if config.enable_drift_detection {
1063 Some(ConceptDriftDetector::new(
1064 DriftDetectionMethod::ADWIN,
1065 config.drift_threshold,
1066 ))
1067 } else {
1068 None
1069 },
1070 samples_seen_: 0,
1071 drift_count_: 0,
1072 last_drift_position_: 0,
1073 overall_accuracy_: 0.0,
1074 recent_predictions_: VecDeque::new(),
1075 };
1076
1077 ensemble.add_new_model(n_features)?;
1079
1080 for i in 0..n_samples {
1082 let x_sample = x.row(i).to_owned();
1083 let y_sample = y[i];
1084 ensemble.partial_fit(&x_sample, y_sample)?;
1085 }
1086
1087 Ok(ensemble)
1088 }
1089}
1090
1091impl Predict<Array2<Float>, Array1<Float>> for StreamingEnsemble<Trained> {
1092 fn predict(&self, x: &Array2<Float>) -> Result<Array1<Float>> {
1093 let mut predictions = Array1::zeros(x.nrows());
1094
1095 for (i, row) in x.axis_iter(Axis(0)).enumerate() {
1096 predictions[i] = self.predict_single(&row.to_owned())?;
1097 }
1098
1099 Ok(predictions)
1100 }
1101}
1102
1103impl Default for StreamingEnsemble<Untrained> {
1104 fn default() -> Self {
1105 Self::new()
1106 }
1107}
1108
1109#[allow(non_snake_case)]
1110#[cfg(test)]
1111mod tests {
1112 use super::*;
1113 use scirs2_core::ndarray::array;
1114
1115 #[test]
1116 fn test_concept_drift_detector() {
1117 let mut detector = ConceptDriftDetector::new(DriftDetectionMethod::ErrorRate, 0.3);
1118
1119 for _ in 0..50 {
1121 let (drift, _warning) = detector.update(0.1); assert!(!drift);
1123 }
1124
1125 for _ in 0..30 {
1127 let (drift, _warning) = detector.update(0.8); if drift {
1129 break; }
1131 }
1132 }
1133
1134 #[test]
1135 fn test_streaming_ensemble_basic() {
1136 let x = Array2::from_shape_vec(
1137 (20, 2),
1138 vec![
1139 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 5.0, 5.0, 6.0, 6.0, 7.0, 7.0, 8.0, 8.0, 9.0,
1140 9.0, 10.0, 10.0, 11.0, 11.0, 12.0, 12.0, 13.0, 13.0, 14.0, 14.0, 15.0, 15.0, 16.0,
1141 16.0, 17.0, 17.0, 18.0, 18.0, 19.0, 19.0, 20.0, 20.0, 21.0,
1142 ],
1143 )
1144 .unwrap();
1145
1146 let y = Array1::from_shape_vec(20, (0..20).map(|i| i as Float).collect()).unwrap();
1147
1148 let ensemble = StreamingEnsemble::new()
1149 .max_models(5)
1150 .enable_drift_detection(true);
1151
1152 let trained = ensemble.fit(&x, &y).unwrap();
1153
1154 assert!(trained.model_count() > 0);
1155 assert!(trained.samples_seen() == 20);
1156
1157 let predictions = trained.predict(&x).unwrap();
1158 assert_eq!(predictions.len(), x.nrows());
1159 }
1160
1161 #[test]
1162 fn test_streaming_partial_fit() {
1163 let ensemble = StreamingEnsemble::new()
1164 .max_models(3)
1165 .enable_drift_detection(false);
1166
1167 let x = Array2::from_shape_vec(
1169 (5, 2),
1170 vec![1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 5.0, 5.0, 6.0],
1171 )
1172 .unwrap();
1173 let y = array![3.0, 5.0, 7.0, 9.0, 11.0];
1174
1175 let mut trained = ensemble.fit(&x, &y).unwrap();
1176
1177 let x_new = array![6.0, 7.0];
1179 let prediction = trained.partial_fit(&x_new, 13.0).unwrap();
1180
1181 assert!(trained.samples_seen() == 6);
1182 assert!(!prediction.is_nan());
1183
1184 for i in 7..15 {
1186 let x_new = array![i as Float, (i + 1) as Float];
1187 trained.partial_fit(&x_new, (2 * i + 1) as Float).unwrap();
1188 }
1189
1190 assert!(trained.samples_seen() == 14);
1191 }
1192
1193 #[test]
1194 fn test_concept_drift_adaptation() {
1195 let ensemble = StreamingEnsemble::for_concept_drift();
1196
1197 let mut x_data = Vec::new();
1199 let mut y_data = Vec::new();
1200
1201 for i in 0..50 {
1202 x_data.extend_from_slice(&[i as Float, (i * 2) as Float]);
1203 y_data.push(i as Float * 2.0 + 1.0);
1204 }
1205
1206 let x1 = Array2::from_shape_vec((50, 2), x_data).unwrap();
1207 let y1 = Array1::from_vec(y_data);
1208
1209 let mut trained = ensemble.fit(&x1, &y1).unwrap();
1210 let initial_models = trained.model_count();
1211
1212 let mut x_data2 = Vec::new();
1214 let mut y_data2 = Vec::new();
1215
1216 for i in 50..100 {
1217 x_data2.extend_from_slice(&[i as Float, (i * 2) as Float]);
1218 y_data2.push(i as Float * 0.5 + 10.0); }
1220
1221 for i in 0..50 {
1223 let x_sample = array![x_data2[i * 2], x_data2[i * 2 + 1]];
1224 trained.partial_fit(&x_sample, y_data2[i]).unwrap();
1225 }
1226
1227 assert!(trained.samples_seen() == 100);
1229 }
1231
1232 #[test]
1233 #[ignore] fn test_streaming_model_performance() {
1235 let mut model = StreamingLinearRegression::new(2, 0.01);
1236
1237 for i in 0..20 {
1239 let x = array![i as Float, (i * 2) as Float];
1240 let y = i as Float * 2.0 + 1.0;
1241 model.partial_fit(&x, y).unwrap();
1242 }
1243
1244 let test_x = array![10.0, 20.0];
1246 let prediction = model.predict(&test_x).unwrap();
1247
1248 println!(
1250 "Prediction: {}, Expected: 21.0, Difference: {}",
1251 prediction,
1252 (prediction - 21.0).abs()
1253 );
1254
1255 assert!((prediction - 21.0).abs() < 50.0); let performance = model.get_performance();
1261 assert!(performance > 0.0 && performance <= 1.0);
1262 }
1263
1264 #[test]
1265 fn test_dynamic_ensemble_size_adjustment() {
1266 let mut ensemble = StreamingEnsemble::new()
1267 .max_models(8)
1268 .adaptive_ensemble_size(true)
1269 .performance_window_size(50);
1270
1271 let (n_samples, n_features) = (200, 2);
1272 let x = Array2::from_shape_fn((n_samples, n_features), |(i, j)| {
1273 (i as Float + j as Float) / 10.0
1274 });
1275 let y = Array1::from_shape_fn(n_samples, |i| (i % 2) as Float);
1276
1277 let mut ensemble = ensemble.fit(&x, &y).unwrap();
1278 let initial_count = ensemble.model_count();
1279
1280 for i in 0..300 {
1282 let x_sample = Array1::from_shape_fn(n_features, |j| (i as Float + j as Float) / 10.0);
1283 let y_sample = (i % 2) as Float;
1284
1285 let _pred = ensemble.partial_fit(&x_sample, y_sample).unwrap();
1286 }
1287
1288 let final_count = ensemble.model_count();
1290
1291 assert!(final_count >= 1, "Should maintain at least one model");
1293 assert!(final_count <= 8, "Should not exceed maximum models");
1294
1295 let test_x = Array1::from_shape_fn(n_features, |j| j as Float);
1297 let prediction = ensemble.predict_single(&test_x).unwrap();
1298 assert!(prediction.is_finite(), "Prediction should be finite");
1299 }
1300}