1use scirs2_core::numeric::Float;
7use std::collections::VecDeque;
8use std::iter::Sum;
9use std::time::{Duration, Instant};
10
11#[allow(unused_imports)]
12use crate::error::Result;
13
14#[derive(Debug, Clone, Copy, PartialEq)]
16#[allow(dead_code)]
17pub enum DriftDetectionMethod {
18 PageHinkley,
20 Adwin,
22 DriftDetectionMethod,
24 EarlyDriftDetection,
26 StatisticalTest,
28 Ensemble,
30}
31
32#[derive(Debug, Clone)]
34#[allow(dead_code)]
35pub struct DriftDetectorConfig {
36 pub method: DriftDetectionMethod,
38 pub min_samples: usize,
40 pub threshold: f64,
42 pub window_size: usize,
44 pub alpha: f64,
46 pub warningthreshold: f64,
48 pub enable_ensemble: bool,
50}
51
52impl Default for DriftDetectorConfig {
53 fn default() -> Self {
54 Self {
55 method: DriftDetectionMethod::PageHinkley,
56 min_samples: 30,
57 threshold: 3.0,
58 window_size: 100,
59 alpha: 0.005,
60 warningthreshold: 2.0,
61 enable_ensemble: false,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq)]
68pub enum DriftStatus {
69 Stable,
71 Warning,
73 Drift,
75}
76
77#[derive(Debug, Clone)]
79pub struct DriftEvent<A: Float + Send + Sync> {
80 pub timestamp: Instant,
82 pub confidence: A,
84 pub drift_type: DriftType,
86 pub adaptation_recommendation: AdaptationRecommendation,
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
92pub enum DriftType {
93 Sudden,
95 Gradual,
97 Incremental,
99 Recurring,
101 Blip,
103}
104
105#[derive(Debug, Clone)]
107pub enum AdaptationRecommendation {
108 Reset,
110 IncreaseLearningRate { factor: f64 },
112 DecreaseLearningRate { factor: f64 },
114 SwitchOptimizer { new_optimizer: String },
116 AdjustWindow { new_size: usize },
118 NoAction,
120}
121
122#[derive(Debug, Clone)]
124pub struct PageHinkleyDetector<A: Float + Send + Sync> {
125 sum: A,
127 min_sum: A,
129 threshold: A,
131 warningthreshold: A,
133 sample_count: usize,
135 last_drift: Option<Instant>,
137}
138
139impl<A: Float + Send + Sync + Send + Sync> PageHinkleyDetector<A> {
140 pub fn new(threshold: A, warningthreshold: A) -> Self {
142 Self {
143 sum: A::zero(),
144 min_sum: A::zero(),
145 threshold,
146 warningthreshold,
147 sample_count: 0,
148 last_drift: None,
149 }
150 }
151
152 pub fn update(&mut self, loss: A) -> DriftStatus {
154 self.sample_count += 1;
155
156 let mean_loss = A::from(0.1).expect("unwrap failed"); self.sum = self.sum + loss - mean_loss;
159
160 if self.sum < self.min_sum {
162 self.min_sum = self.sum;
163 }
164
165 let test_stat = self.sum - self.min_sum;
167
168 if test_stat > self.threshold {
169 self.last_drift = Some(Instant::now());
170 self.reset();
171 DriftStatus::Drift
172 } else if test_stat > self.warningthreshold {
173 DriftStatus::Warning
174 } else {
175 DriftStatus::Stable
176 }
177 }
178
179 pub fn reset(&mut self) {
181 self.sum = A::zero();
182 self.min_sum = A::zero();
183 self.sample_count = 0;
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct AdwinDetector<A: Float + Send + Sync> {
190 window: VecDeque<A>,
192 max_windowsize: usize,
194 delta: A,
196 min_window_size: usize,
198}
199
200impl<A: Float + Sum + Send + Sync + Send + Sync> AdwinDetector<A> {
201 pub fn new(delta: A, max_windowsize: usize) -> Self {
203 Self {
204 window: VecDeque::new(),
205 max_windowsize,
206 delta,
207 min_window_size: 10,
208 }
209 }
210
211 pub fn update(&mut self, value: A) -> DriftStatus {
213 self.window.push_back(value);
214
215 if self.window.len() > self.max_windowsize {
217 self.window.pop_front();
218 }
219
220 if self.window.len() >= self.min_window_size {
222 if self.detect_change() {
223 self.shrink_window();
224 DriftStatus::Drift
225 } else {
226 DriftStatus::Stable
227 }
228 } else {
229 DriftStatus::Stable
230 }
231 }
232
233 fn detect_change(&self) -> bool {
235 let n = self.window.len();
236 if n < 2 {
237 return false;
238 }
239
240 let mid = n / 2;
242
243 let first_half: Vec<_> = self.window.iter().take(mid).cloned().collect();
244 let second_half: Vec<_> = self.window.iter().skip(mid).cloned().collect();
245
246 let mean1 = first_half.iter().cloned().sum::<A>()
247 / A::from(first_half.len()).expect("unwrap failed");
248 let mean2 = second_half.iter().cloned().sum::<A>()
249 / A::from(second_half.len()).expect("unwrap failed");
250
251 let var1 = first_half
253 .iter()
254 .map(|&x| {
255 let diff = x - mean1;
256 diff * diff
257 })
258 .sum::<A>()
259 / A::from(first_half.len()).expect("unwrap failed");
260
261 let var2 = second_half
262 .iter()
263 .map(|&x| {
264 let diff = x - mean2;
265 diff * diff
266 })
267 .sum::<A>()
268 / A::from(second_half.len()).expect("unwrap failed");
269
270 let diff = (mean1 - mean2).abs();
272 let threshold = (var1 + var2 + A::from(0.01).expect("unwrap failed")).sqrt();
273
274 diff > threshold
275 }
276
277 fn shrink_window(&mut self) {
279 let new_size = self.window.len() / 2;
280 while self.window.len() > new_size {
281 self.window.pop_front();
282 }
283 }
284}
285
286#[derive(Debug, Clone)]
288pub struct DdmDetector<A: Float + Send + Sync> {
289 error_rate: A,
291 error_std: A,
293 min_error_plus_2_std: A,
295 min_error_plus_3_std: A,
297 sample_count: usize,
299 error_count: usize,
301}
302
303impl<A: Float + Send + Sync + Send + Sync> DdmDetector<A> {
304 pub fn new() -> Self {
306 Self {
307 error_rate: A::zero(),
308 error_std: A::one(),
309 min_error_plus_2_std: A::from(f64::MAX).expect("unwrap failed"),
310 min_error_plus_3_std: A::from(f64::MAX).expect("unwrap failed"),
311 sample_count: 0,
312 error_count: 0,
313 }
314 }
315
316 pub fn update(&mut self, iserror: bool) -> DriftStatus {
318 self.sample_count += 1;
319 if iserror {
320 self.error_count += 1;
321 }
322
323 if self.sample_count < 30 {
324 return DriftStatus::Stable;
325 }
326
327 self.error_rate =
329 A::from(self.error_count as f64 / self.sample_count as f64).expect("unwrap failed");
330 let p = self.error_rate;
331 let n = A::from(self.sample_count as f64).expect("unwrap failed");
332 self.error_std = (p * (A::one() - p) / n).sqrt();
333
334 let current_level = self.error_rate + A::from(2.0).expect("unwrap failed") * self.error_std;
335
336 if current_level < self.min_error_plus_2_std {
338 self.min_error_plus_2_std = current_level;
339 self.min_error_plus_3_std =
340 self.error_rate + A::from(3.0).expect("unwrap failed") * self.error_std;
341 }
342
343 if current_level > self.min_error_plus_3_std {
345 self.reset();
346 DriftStatus::Drift
347 } else if current_level > self.min_error_plus_2_std {
348 DriftStatus::Warning
349 } else {
350 DriftStatus::Stable
351 }
352 }
353
354 pub fn reset(&mut self) {
356 self.sample_count = 0;
357 self.error_count = 0;
358 self.error_rate = A::zero();
359 self.error_std = A::one();
360 self.min_error_plus_2_std = A::from(f64::MAX).expect("unwrap failed");
361 self.min_error_plus_3_std = A::from(f64::MAX).expect("unwrap failed");
362 }
363}
364
365impl<A: Float + Send + Sync + Send + Sync> Default for DdmDetector<A> {
366 fn default() -> Self {
367 Self::new()
368 }
369}
370
371pub struct ConceptDriftDetector<A: Float + Send + Sync> {
373 config: DriftDetectorConfig,
375
376 ph_detector: PageHinkleyDetector<A>,
378
379 adwin_detector: AdwinDetector<A>,
381
382 ddm_detector: DdmDetector<A>,
384
385 ensemble_history: VecDeque<DriftStatus>,
387
388 drift_events: Vec<DriftEvent<A>>,
390
391 performance_tracker: PerformanceDriftTracker<A>,
393}
394
395impl<A: Float + std::fmt::Debug + Sum + Send + Sync + Send + Sync> ConceptDriftDetector<A> {
396 pub fn new(config: DriftDetectorConfig) -> Self {
398 let threshold = A::from(config.threshold).expect("unwrap failed");
399 let warningthreshold = A::from(config.warningthreshold).expect("unwrap failed");
400 let delta = A::from(config.alpha).expect("unwrap failed");
401
402 Self {
403 ph_detector: PageHinkleyDetector::new(threshold, warningthreshold),
404 adwin_detector: AdwinDetector::new(delta, config.window_size),
405 ddm_detector: DdmDetector::new(),
406 ensemble_history: VecDeque::with_capacity(10),
407 drift_events: Vec::new(),
408 performance_tracker: PerformanceDriftTracker::new(),
409 config,
410 }
411 }
412
413 pub fn update(&mut self, loss: A, is_predictionerror: bool) -> Result<DriftStatus> {
415 let ph_status = self.ph_detector.update(loss);
416 let adwin_status = self.adwin_detector.update(loss);
417 let ddm_status = self.ddm_detector.update(is_predictionerror);
418
419 let final_status = if self.config.enable_ensemble {
420 self.ensemble_vote(ph_status, adwin_status, ddm_status)
421 } else {
422 match self.config.method {
423 DriftDetectionMethod::PageHinkley => ph_status,
424 DriftDetectionMethod::Adwin => adwin_status,
425 DriftDetectionMethod::DriftDetectionMethod => ddm_status,
426 _ => ddm_status, }
428 };
429
430 if final_status == DriftStatus::Drift {
432 let event = DriftEvent {
433 timestamp: Instant::now(),
434 confidence: A::from(0.8).expect("unwrap failed"), drift_type: self.classify_drift_type(),
436 adaptation_recommendation: self.generate_adaptation_recommendation(),
437 };
438 self.drift_events.push(event);
439 }
440
441 self.performance_tracker.update(loss, final_status);
443
444 Ok(final_status)
445 }
446
447 fn ensemble_vote(
449 &mut self,
450 ph: DriftStatus,
451 adwin: DriftStatus,
452 ddm: DriftStatus,
453 ) -> DriftStatus {
454 let votes = [ph, adwin, ddm];
455
456 let drift_votes = votes.iter().filter(|&&s| s == DriftStatus::Drift).count();
458 let warning_votes = votes.iter().filter(|&&s| s == DriftStatus::Warning).count();
459
460 if drift_votes >= 2 {
461 DriftStatus::Drift
462 } else if warning_votes >= 2 || drift_votes >= 1 {
463 DriftStatus::Warning
464 } else {
465 DriftStatus::Stable
466 }
467 }
468
469 fn classify_drift_type(&self) -> DriftType {
471 if self.drift_events.len() < 2 {
473 return DriftType::Sudden;
474 }
475
476 let recent_events = self.drift_events.iter().rev().take(5);
477 let time_intervals: Vec<_> = recent_events
478 .map(|event| event.timestamp)
479 .collect::<Vec<_>>()
480 .windows(2)
481 .map(|window| window[0].duration_since(window[1]))
482 .collect();
483
484 if time_intervals.iter().all(|&d| d < Duration::from_secs(60)) {
485 DriftType::Sudden
486 } else if time_intervals.len() > 2 {
487 DriftType::Gradual
488 } else {
489 DriftType::Incremental
490 }
491 }
492
493 fn generate_adaptation_recommendation(&self) -> AdaptationRecommendation {
495 let recent_performance = self.performance_tracker.get_recent_performance_change();
496
497 if recent_performance > A::from(0.5).expect("unwrap failed") {
498 AdaptationRecommendation::Reset
500 } else if recent_performance > A::from(0.2).expect("unwrap failed") {
501 AdaptationRecommendation::IncreaseLearningRate { factor: 1.5 }
503 } else if recent_performance < A::from(-0.1).expect("unwrap failed") {
504 AdaptationRecommendation::DecreaseLearningRate { factor: 0.8 }
506 } else {
507 AdaptationRecommendation::NoAction
508 }
509 }
510
511 pub fn get_statistics(&self) -> DriftStatistics<A> {
513 DriftStatistics {
514 total_drifts: self.drift_events.len(),
515 recent_drift_rate: self.calculate_recent_drift_rate(),
516 average_drift_confidence: self.calculate_average_confidence(),
517 drift_types_distribution: self.calculate_drift_type_distribution(),
518 time_since_last_drift: self.time_since_last_drift(),
519 }
520 }
521
522 fn calculate_recent_drift_rate(&self) -> f64 {
523 let one_hour_ago = Instant::now() - Duration::from_secs(3600);
525 let recent_drifts = self
526 .drift_events
527 .iter()
528 .filter(|event| event.timestamp > one_hour_ago)
529 .count();
530 recent_drifts as f64 / 3600.0 }
532
533 fn calculate_average_confidence(&self) -> Option<A> {
534 if self.drift_events.is_empty() {
535 None
536 } else {
537 let sum = self
538 .drift_events
539 .iter()
540 .map(|event| event.confidence)
541 .sum::<A>();
542 Some(sum / A::from(self.drift_events.len()).expect("unwrap failed"))
543 }
544 }
545
546 fn calculate_drift_type_distribution(&self) -> std::collections::HashMap<DriftType, usize> {
547 let mut distribution = std::collections::HashMap::new();
548 for event in &self.drift_events {
549 *distribution.entry(event.drift_type).or_insert(0) += 1;
550 }
551 distribution
552 }
553
554 fn time_since_last_drift(&self) -> Option<Duration> {
555 self.drift_events
556 .last()
557 .map(|event| event.timestamp.elapsed())
558 }
559}
560
561#[derive(Debug, Clone)]
563struct PerformanceDriftTracker<A: Float + Send + Sync> {
564 performance_history: VecDeque<(A, DriftStatus, Instant)>,
566 window_size: usize,
568}
569
570impl<A: Float + std::iter::Sum + Send + Sync + Send + Sync> PerformanceDriftTracker<A> {
571 fn new() -> Self {
572 Self {
573 performance_history: VecDeque::new(),
574 window_size: 100,
575 }
576 }
577
578 fn update(&mut self, performance: A, driftstatus: DriftStatus) {
579 self.performance_history
580 .push_back((performance, driftstatus, Instant::now()));
581
582 if self.performance_history.len() > self.window_size {
584 self.performance_history.pop_front();
585 }
586 }
587
588 fn get_recent_performance_change(&self) -> A {
590 if self.performance_history.len() < 10 {
591 return A::zero();
592 }
593
594 let recent: Vec<_> = self.performance_history.iter().rev().take(10).collect();
595 let older: Vec<_> = self
596 .performance_history
597 .iter()
598 .rev()
599 .skip(10)
600 .take(10)
601 .collect();
602
603 if older.is_empty() {
604 return A::zero();
605 }
606
607 let recent_avg = recent.iter().map(|(p, _, _)| *p).sum::<A>()
608 / A::from(recent.len()).expect("unwrap failed");
609 let older_avg = older.iter().map(|(p, _, _)| *p).sum::<A>()
610 / A::from(older.len()).expect("unwrap failed");
611
612 recent_avg - older_avg
613 }
614}
615
616#[derive(Debug, Clone)]
618pub struct DriftStatistics<A: Float + Send + Sync> {
619 pub total_drifts: usize,
621 pub recent_drift_rate: f64,
623 pub average_drift_confidence: Option<A>,
625 pub drift_types_distribution: std::collections::HashMap<DriftType, usize>,
627 pub time_since_last_drift: Option<Duration>,
629}
630
631pub mod advanced_drift_analysis {
633 use super::*;
634 use std::collections::HashMap;
635
636 #[derive(Debug)]
638 pub struct AdvancedDriftDetector<A: Float + Send + Sync> {
639 base_detectors: Vec<Box<dyn DriftDetectorTrait<A>>>,
641
642 pattern_analyzer: DriftPatternAnalyzer<A>,
644
645 threshold_manager: AdaptiveThresholdManager<A>,
647
648 context_detector: ContextAwareDriftDetector<A>,
650
651 impact_analyzer: DriftImpactAnalyzer<A>,
653
654 adaptation_selector: AdaptationStrategySelector<A>,
656
657 drift_database: DriftDatabase<A>,
659 }
660
661 pub trait DriftDetectorTrait<A: Float + Send + Sync>: std::fmt::Debug {
663 fn update(&mut self, value: A) -> DriftStatus;
664 fn reset(&mut self);
665 fn get_confidence(&self) -> A;
666 }
667
668 #[derive(Debug)]
670 pub struct DriftPatternAnalyzer<A: Float + Send + Sync> {
671 pattern_buffer: VecDeque<PatternFeatures<A>>,
673
674 known_patterns: HashMap<String, DriftPattern<A>>,
676
677 matching_threshold: A,
679
680 feature_extractors: Vec<Box<dyn FeatureExtractor<A>>>,
682 }
683
684 #[derive(Debug, Clone)]
686 pub struct PatternFeatures<A: Float + Send + Sync> {
687 pub mean: A,
689 pub variance: A,
690 pub skewness: A,
691 pub kurtosis: A,
692
693 pub trend_slope: A,
695 pub trend_strength: A,
696
697 pub dominant_frequency: A,
699 pub spectral_entropy: A,
700
701 pub temporal_locality: A,
703 pub persistence: A,
704
705 pub entropy: A,
707 pub fractal_dimension: A,
708 }
709
710 #[derive(Debug, Clone)]
712 pub struct DriftPattern<A: Float + Send + Sync> {
713 pub id: String,
715
716 pub features: PatternFeatures<A>,
718
719 pub pattern_type: DriftType,
721
722 pub typical_duration: Duration,
724
725 pub optimal_adaptation: AdaptationRecommendation,
727
728 pub adaptation_success_rate: A,
730
731 pub occurrence_count: usize,
733 }
734
735 pub trait FeatureExtractor<A: Float + Send + Sync>: std::fmt::Debug {
737 fn extract(&self, data: &[A]) -> A;
738 fn name(&self) -> &str;
739 }
740
741 #[derive(Debug)]
743 pub struct AdaptiveThresholdManager<A: Float + Send + Sync> {
744 thresholds: HashMap<String, A>,
746
747 threshold_history: VecDeque<ThresholdUpdate<A>>,
749
750 performance_feedback: VecDeque<PerformanceFeedback<A>>,
752
753 learning_rate: A,
755 }
756
757 #[derive(Debug, Clone)]
759 pub struct ThresholdUpdate<A: Float + Send + Sync> {
760 pub detector_name: String,
761 pub old_threshold: A,
762 pub new_threshold: A,
763 pub timestamp: Instant,
764 pub reason: String,
765 }
766
767 #[derive(Debug, Clone)]
769 pub struct PerformanceFeedback<A: Float + Send + Sync> {
770 pub true_positive_rate: A,
771 pub false_positive_rate: A,
772 pub detection_delay: Duration,
773 pub adaptation_effectiveness: A,
774 pub timestamp: Instant,
775 }
776
777 #[derive(Debug)]
779 pub struct ContextAwareDriftDetector<A: Float + Send + Sync> {
780 context_features: Vec<ContextFeature<A>>,
782
783 context_models: HashMap<String, Box<dyn DriftDetectorTrait<A>>>,
785
786 current_context: Option<String>,
788
789 transition_matrix: HashMap<(String, String), A>,
791 }
792
793 #[derive(Debug, Clone)]
795 pub struct ContextFeature<A: Float + Send + Sync> {
796 pub name: String,
797 pub value: A,
798 pub importance_weight: A,
799 pub temporal_stability: A,
800 }
801
802 #[derive(Debug)]
804 pub struct DriftImpactAnalyzer<A: Float + Send + Sync> {
805 impact_history: VecDeque<DriftImpact<A>>,
807
808 severity_classifier: SeverityClassifier<A>,
810
811 recovery_predictor: RecoveryTimePredictor<A>,
813
814 business_impact_estimator: BusinessImpactEstimator<A>,
816 }
817
818 #[derive(Debug, Clone)]
820 pub struct DriftImpact<A: Float + Send + Sync> {
821 pub performance_degradation: A,
823
824 pub affected_metrics: Vec<String>,
826
827 pub estimated_recovery_time: Duration,
829
830 pub confidence: A,
832
833 pub business_impact_score: A,
835
836 pub urgency_level: UrgencyLevel,
838 }
839
840 #[derive(Debug, Clone, Copy, PartialEq)]
842 pub enum UrgencyLevel {
843 Low,
844 Medium,
845 High,
846 Critical,
847 }
848
849 #[derive(Debug)]
851 pub struct AdaptationStrategySelector<A: Float + Send + Sync> {
852 strategies: Vec<AdaptationStrategy<A>>,
854
855 strategy_performance: HashMap<String, StrategyPerformance<A>>,
857
858 bandit: EpsilonGreedyBandit<A>,
860
861 context_strategy_map: HashMap<String, Vec<String>>,
863 }
864
865 #[derive(Debug, Clone)]
867 pub struct AdaptationStrategy<A: Float + Send + Sync> {
868 pub id: String,
870
871 pub strategy_type: AdaptationStrategyType,
873
874 pub parameters: HashMap<String, A>,
876
877 pub applicability_conditions: Vec<ApplicabilityCondition<A>>,
879
880 pub expected_effectiveness: A,
882
883 pub computational_cost: A,
885 }
886
887 #[derive(Debug, Clone, Copy)]
889 pub enum AdaptationStrategyType {
890 ParameterTuning,
891 ModelReplacement,
892 EnsembleReweighting,
893 ArchitectureChange,
894 DataAugmentation,
895 FeatureSelection,
896 Hybrid,
897 }
898
899 #[derive(Debug, Clone)]
901 pub struct ApplicabilityCondition<A: Float + Send + Sync> {
902 pub feature_name: String,
903 pub operator: ComparisonOperator,
904 pub threshold: A,
905 pub weight: A,
906 }
907
908 #[derive(Debug, Clone, Copy)]
909 pub enum ComparisonOperator {
910 GreaterThan,
911 LessThan,
912 Equal,
913 NotEqual,
914 GreaterEqual,
915 LessEqual,
916 }
917
918 #[derive(Debug, Clone)]
920 pub struct StrategyPerformance<A: Float + Send + Sync> {
921 pub success_rate: A,
922 pub average_improvement: A,
923 pub average_adaptation_time: Duration,
924 pub stability_after_adaptation: A,
925 pub usage_count: usize,
926 }
927
928 #[derive(Debug)]
930 pub struct EpsilonGreedyBandit<A: Float + Send + Sync> {
931 epsilon: A,
932 action_values: HashMap<String, A>,
933 action_counts: HashMap<String, usize>,
934 total_trials: usize,
935 }
936
937 #[derive(Debug)]
939 pub struct DriftDatabase<A: Float + Send + Sync> {
940 drift_events: Vec<StoredDriftEvent<A>>,
942
943 pattern_outcomes: HashMap<String, Vec<AdaptationOutcome<A>>>,
945
946 seasonal_patterns: HashMap<String, SeasonalPattern<A>>,
948
949 similarity_index: SimilarityIndex<A>,
951 }
952
953 #[derive(Debug, Clone)]
955 pub struct StoredDriftEvent<A: Float + Send + Sync> {
956 pub features: PatternFeatures<A>,
957 pub context: Vec<ContextFeature<A>>,
958 pub applied_strategy: String,
959 pub outcome: AdaptationOutcome<A>,
960 pub timestamp: Instant,
961 }
962
963 #[derive(Debug, Clone)]
965 pub struct AdaptationOutcome<A: Float + Send + Sync> {
966 pub success: bool,
967 pub performance_improvement: A,
968 pub adaptation_time: Duration,
969 pub stability_period: Duration,
970 pub side_effects: Vec<String>,
971 }
972
973 #[derive(Debug, Clone)]
975 pub struct SeasonalPattern<A: Float + Send + Sync> {
976 pub period: Duration,
977 pub amplitude: A,
978 pub phase_offset: Duration,
979 pub pattern_strength: A,
980 pub last_occurrence: Instant,
981 }
982
983 #[derive(Debug)]
985 pub struct SimilarityIndex<A: Float + Send + Sync> {
986 feature_vectors: Vec<(String, Vec<A>)>,
988
989 similarity_threshold: A,
991
992 distance_metric: DistanceMetric,
994 }
995
996 #[derive(Debug, Clone, Copy)]
997 pub enum DistanceMetric {
998 Euclidean,
999 Manhattan,
1000 Cosine,
1001 Mahalanobis,
1002 }
1003
1004 impl<
1005 A: Float + Default + Clone + std::fmt::Debug + std::iter::Sum + Send + Sync + Send + Sync,
1006 > AdvancedDriftDetector<A>
1007 {
1008 pub fn new(config: DriftDetectorConfig) -> Self {
1010 let base_detectors: Vec<Box<dyn DriftDetectorTrait<A>>> = vec![
1011 ];
1013
1014 Self {
1015 base_detectors,
1016 pattern_analyzer: DriftPatternAnalyzer::new(),
1017 threshold_manager: AdaptiveThresholdManager::new(),
1018 context_detector: ContextAwareDriftDetector::new(),
1019 impact_analyzer: DriftImpactAnalyzer::new(),
1020 adaptation_selector: AdaptationStrategySelector::new(),
1021 drift_database: DriftDatabase::new(),
1022 }
1023 }
1024
1025 pub fn detect_drift_advanced(
1027 &mut self,
1028 value: A,
1029 context_features: &[ContextFeature<A>],
1030 ) -> Result<AdvancedDriftResult<A>> {
1031 self.context_detector.update_context(context_features);
1033
1034 let base_results: Vec<_> = self
1036 .base_detectors
1037 .iter_mut()
1038 .map(|detector| detector.update(value))
1039 .collect();
1040
1041 let pattern_features = self.pattern_analyzer.extract_features(&[value])?;
1043 let matched_pattern = self.pattern_analyzer.match_pattern(&pattern_features);
1044
1045 self.threshold_manager
1047 .update_thresholds(&base_results, &pattern_features);
1048
1049 let combined_result = self.combine_detection_results(&base_results, &matched_pattern);
1051
1052 let impact = if combined_result.status == DriftStatus::Drift {
1054 Some(
1055 self.impact_analyzer
1056 .analyze_impact(&pattern_features, &matched_pattern)?,
1057 )
1058 } else {
1059 None
1060 };
1061
1062 let adaptation_strategy = if let Some(ref impact) = impact {
1064 self.adaptation_selector.select_strategy(
1065 &pattern_features,
1066 impact,
1067 &matched_pattern,
1068 )?
1069 } else {
1070 None
1071 };
1072
1073 if combined_result.status == DriftStatus::Drift {
1075 self.drift_database.store_event(
1076 &pattern_features,
1077 context_features,
1078 &adaptation_strategy,
1079 );
1080 }
1081
1082 Ok(AdvancedDriftResult {
1083 status: combined_result.status,
1084 confidence: combined_result.confidence,
1085 matched_pattern,
1086 impact,
1087 recommended_strategy: adaptation_strategy,
1088 feature_importance: self.calculate_feature_importance(&pattern_features),
1089 prediction_horizon: self.estimate_drift_duration(&pattern_features),
1090 })
1091 }
1092
1093 fn combine_detection_results(
1094 &self,
1095 base_results: &[DriftStatus],
1096 matched_pattern: &Option<DriftPattern<A>>,
1097 ) -> CombinedDetectionResult<A> {
1098 let drift_votes = base_results
1100 .iter()
1101 .filter(|&&s| s == DriftStatus::Drift)
1102 .count();
1103 let warning_votes = base_results
1104 .iter()
1105 .filter(|&&s| s == DriftStatus::Warning)
1106 .count();
1107
1108 let pattern_confidence = matched_pattern
1110 .as_ref()
1111 .map(|p| p.adaptation_success_rate)
1112 .unwrap_or(A::from(0.5).expect("unwrap failed"));
1113
1114 let status = if drift_votes >= 2 {
1115 DriftStatus::Drift
1116 } else if warning_votes >= 2
1117 || (drift_votes >= 1 && pattern_confidence > A::from(0.7).expect("unwrap failed"))
1118 {
1119 DriftStatus::Warning
1120 } else {
1121 DriftStatus::Stable
1122 };
1123
1124 let confidence = A::from(drift_votes as f64 / base_results.len() as f64)
1125 .expect("unwrap failed")
1126 * pattern_confidence;
1127
1128 CombinedDetectionResult { status, confidence }
1129 }
1130
1131 fn calculate_feature_importance(
1132 &self,
1133 features: &PatternFeatures<A>,
1134 ) -> HashMap<String, A> {
1135 let mut importance = HashMap::new();
1137 importance.insert("variance".to_string(), features.variance);
1138 importance.insert("trend_slope".to_string(), features.trend_slope.abs());
1139 importance.insert("entropy".to_string(), features.entropy);
1140 importance
1141 }
1142
1143 fn estimate_drift_duration(&self, features: &PatternFeatures<A>) -> Duration {
1144 let base_duration = Duration::from_secs(300); let duration_multiplier = features.trend_strength * features.persistence;
1149 let adjustment = duration_multiplier.to_f64().unwrap_or(1.0);
1150
1151 Duration::from_secs((base_duration.as_secs() as f64 * adjustment) as u64)
1152 }
1153 }
1154
1155 #[derive(Debug, Clone)]
1157 pub struct AdvancedDriftResult<A: Float + Send + Sync> {
1158 pub status: DriftStatus,
1159 pub confidence: A,
1160 pub matched_pattern: Option<DriftPattern<A>>,
1161 pub impact: Option<DriftImpact<A>>,
1162 pub recommended_strategy: Option<AdaptationStrategy<A>>,
1163 pub feature_importance: HashMap<String, A>,
1164 pub prediction_horizon: Duration,
1165 }
1166
1167 #[derive(Debug, Clone)]
1168 struct CombinedDetectionResult<A: Float + Send + Sync> {
1169 status: DriftStatus,
1170 confidence: A,
1171 }
1172
1173 impl<A: Float + std::iter::Sum + Send + Sync + Send + Sync> DriftPatternAnalyzer<A> {
1176 fn new() -> Self {
1177 Self {
1178 pattern_buffer: VecDeque::new(),
1179 known_patterns: HashMap::new(),
1180 matching_threshold: A::from(0.8).expect("unwrap failed"),
1181 feature_extractors: Vec::new(),
1182 }
1183 }
1184
1185 fn extract_features(&mut self, data: &[A]) -> Result<PatternFeatures<A>> {
1186 let mean =
1188 data.iter().cloned().sum::<A>() / A::from(data.len()).expect("unwrap failed");
1189 let variance = data.iter().map(|&x| (x - mean) * (x - mean)).sum::<A>()
1190 / A::from(data.len()).expect("unwrap failed");
1191
1192 Ok(PatternFeatures {
1193 mean,
1194 variance,
1195 skewness: A::zero(), kurtosis: A::zero(),
1197 trend_slope: A::zero(),
1198 trend_strength: A::zero(),
1199 dominant_frequency: A::zero(),
1200 spectral_entropy: A::zero(),
1201 temporal_locality: A::zero(),
1202 persistence: A::zero(),
1203 entropy: variance.ln().abs(), fractal_dimension: A::from(1.5).expect("unwrap failed"), })
1206 }
1207
1208 fn match_pattern(&self, features: &PatternFeatures<A>) -> Option<DriftPattern<A>> {
1209 self.known_patterns
1211 .values()
1212 .find(|pattern| {
1213 self.calculate_similarity(&pattern.features, features) > self.matching_threshold
1214 })
1215 .cloned()
1216 }
1217
1218 fn calculate_similarity(&self, p1: &PatternFeatures<A>, p2: &PatternFeatures<A>) -> A {
1219 let mean_diff = (p1.mean - p2.mean).abs();
1221 let var_diff = (p1.variance - p2.variance).abs();
1222 A::one() - (mean_diff + var_diff) / A::from(2.0).expect("unwrap failed")
1223 }
1224 }
1225
1226 impl<A: Float + Send + Sync + Send + Sync> AdaptiveThresholdManager<A> {
1227 fn new() -> Self {
1228 Self {
1229 thresholds: HashMap::new(),
1230 threshold_history: VecDeque::new(),
1231 performance_feedback: VecDeque::new(),
1232 learning_rate: A::from(0.01).expect("unwrap failed"),
1233 }
1234 }
1235
1236 fn update_thresholds(&mut self, results: &[DriftStatus], features: &PatternFeatures<A>) {
1237 for (i, result) in results.iter().enumerate() {
1239 let detector_name = format!("detector_{}", i);
1240 let current_threshold = self
1241 .thresholds
1242 .get(&detector_name)
1243 .cloned()
1244 .unwrap_or(A::from(1.0).expect("unwrap failed"));
1245
1246 let adjustment = if *result == DriftStatus::Drift {
1248 -self.learning_rate } else {
1250 self.learning_rate * A::from(0.1).expect("unwrap failed") };
1252
1253 let new_threshold = current_threshold + adjustment;
1254 self.thresholds.insert(detector_name.clone(), new_threshold);
1255
1256 self.threshold_history.push_back(ThresholdUpdate {
1257 detector_name,
1258 old_threshold: current_threshold,
1259 new_threshold,
1260 timestamp: Instant::now(),
1261 reason: "Performance-based adjustment".to_string(),
1262 });
1263 }
1264 }
1265 }
1266
1267 impl<A: Float + Send + Sync + Send + Sync> ContextAwareDriftDetector<A> {
1268 fn new() -> Self {
1269 Self {
1270 context_features: Vec::new(),
1271 context_models: HashMap::new(),
1272 current_context: None,
1273 transition_matrix: HashMap::new(),
1274 }
1275 }
1276
1277 fn update_context(&mut self, features: &[ContextFeature<A>]) {
1278 self.context_features = features.to_vec();
1279
1280 let context_id = if !features.is_empty()
1282 && features[0].value > A::from(0.5).expect("unwrap failed")
1283 {
1284 "high_activity".to_string()
1285 } else {
1286 "low_activity".to_string()
1287 };
1288
1289 self.current_context = Some(context_id);
1290 }
1291 }
1292
1293 impl<A: Float + Send + Sync + Send + Sync> DriftImpactAnalyzer<A> {
1294 fn new() -> Self {
1295 Self {
1296 impact_history: VecDeque::new(),
1297 severity_classifier: SeverityClassifier::new(),
1298 recovery_predictor: RecoveryTimePredictor::new(),
1299 business_impact_estimator: BusinessImpactEstimator::new(),
1300 }
1301 }
1302
1303 fn analyze_impact(
1304 &mut self,
1305 features: &PatternFeatures<A>,
1306 _pattern: &Option<DriftPattern<A>>,
1307 ) -> Result<DriftImpact<A>> {
1308 let performance_degradation = features.variance; let urgency_level = if performance_degradation > A::from(1.0).expect("unwrap failed") {
1310 UrgencyLevel::High
1311 } else {
1312 UrgencyLevel::Medium
1313 };
1314
1315 Ok(DriftImpact {
1316 performance_degradation,
1317 affected_metrics: vec!["accuracy".to_string(), "loss".to_string()],
1318 estimated_recovery_time: Duration::from_secs(300),
1319 confidence: A::from(0.8).expect("unwrap failed"),
1320 business_impact_score: performance_degradation,
1321 urgency_level,
1322 })
1323 }
1324 }
1325
1326 impl<A: Float + Send + Sync + Send + Sync> AdaptationStrategySelector<A> {
1327 fn new() -> Self {
1328 Self {
1329 strategies: Vec::new(),
1330 strategy_performance: HashMap::new(),
1331 bandit: EpsilonGreedyBandit::new(A::from(0.1).expect("unwrap failed")),
1332 context_strategy_map: HashMap::new(),
1333 }
1334 }
1335
1336 fn select_strategy(
1337 &mut self,
1338 features: &PatternFeatures<A>,
1339 _impact: &DriftImpact<A>,
1340 _pattern: &Option<DriftPattern<A>>,
1341 ) -> Result<Option<AdaptationStrategy<A>>> {
1342 let strategy = AdaptationStrategy {
1344 id: "increase_lr".to_string(),
1345 strategy_type: AdaptationStrategyType::ParameterTuning,
1346 parameters: {
1347 let mut params = HashMap::new();
1348 params.insert(
1349 "learning_rate_factor".to_string(),
1350 A::from(1.5).expect("unwrap failed"),
1351 );
1352 params
1353 },
1354 applicability_conditions: Vec::new(),
1355 expected_effectiveness: A::from(0.7).expect("unwrap failed"),
1356 computational_cost: A::from(0.1).expect("unwrap failed"),
1357 };
1358
1359 Ok(Some(strategy))
1360 }
1361 }
1362
1363 impl<A: Float + Send + Sync + Send + Sync> DriftDatabase<A> {
1364 fn new() -> Self {
1365 Self {
1366 drift_events: Vec::new(),
1367 pattern_outcomes: HashMap::new(),
1368 seasonal_patterns: HashMap::new(),
1369 similarity_index: SimilarityIndex::new(),
1370 }
1371 }
1372
1373 fn store_event(
1374 &mut self,
1375 features: &PatternFeatures<A>,
1376 context: &[ContextFeature<A>],
1377 strategy: &Option<AdaptationStrategy<A>>,
1378 ) {
1379 if let Some(strat) = strategy {
1380 let event = StoredDriftEvent {
1381 features: features.clone(),
1382 context: context.to_vec(),
1383 applied_strategy: strat.id.clone(),
1384 outcome: AdaptationOutcome {
1385 success: true, performance_improvement: A::from(0.1).expect("unwrap failed"),
1387 adaptation_time: Duration::from_secs(60),
1388 stability_period: Duration::from_secs(300),
1389 side_effects: Vec::new(),
1390 },
1391 timestamp: Instant::now(),
1392 };
1393
1394 self.drift_events.push(event);
1395 }
1396 }
1397 }
1398
1399 impl<A: Float + Send + Sync + Send + Sync> SimilarityIndex<A> {
1400 fn new() -> Self {
1401 Self {
1402 feature_vectors: Vec::new(),
1403 similarity_threshold: A::from(0.8).expect("unwrap failed"),
1404 distance_metric: DistanceMetric::Euclidean,
1405 }
1406 }
1407 }
1408
1409 impl<A: Float + Send + Sync + Send + Sync> EpsilonGreedyBandit<A> {
1410 fn new(epsilon: A) -> Self {
1411 Self {
1412 epsilon,
1413 action_values: HashMap::new(),
1414 action_counts: HashMap::new(),
1415 total_trials: 0,
1416 }
1417 }
1418 }
1419
1420 #[derive(Debug)]
1423 struct SeverityClassifier<A: Float + Send + Sync> {
1424 _phantom: std::marker::PhantomData<A>,
1425 }
1426
1427 impl<A: Float + Send + Sync + Send + Sync> SeverityClassifier<A> {
1428 fn new() -> Self {
1429 Self {
1430 _phantom: std::marker::PhantomData,
1431 }
1432 }
1433 }
1434
1435 #[derive(Debug)]
1436 struct RecoveryTimePredictor<A: Float + Send + Sync> {
1437 _phantom: std::marker::PhantomData<A>,
1438 }
1439
1440 impl<A: Float + Send + Sync + Send + Sync> RecoveryTimePredictor<A> {
1441 fn new() -> Self {
1442 Self {
1443 _phantom: std::marker::PhantomData,
1444 }
1445 }
1446 }
1447
1448 #[derive(Debug)]
1449 struct BusinessImpactEstimator<A: Float + Send + Sync> {
1450 _phantom: std::marker::PhantomData<A>,
1451 }
1452
1453 impl<A: Float + Send + Sync + Send + Sync> BusinessImpactEstimator<A> {
1454 fn new() -> Self {
1455 Self {
1456 _phantom: std::marker::PhantomData,
1457 }
1458 }
1459 }
1460}
1461
1462#[cfg(test)]
1463mod tests {
1464 use super::*;
1465
1466 #[test]
1467 fn test_page_hinkley_detector() {
1468 let mut detector = PageHinkleyDetector::new(3.0f64, 2.0f64);
1469
1470 for _ in 0..10 {
1472 let status = detector.update(0.1);
1473 assert_eq!(status, DriftStatus::Stable);
1474 }
1475
1476 for _ in 0..5 {
1478 let status = detector.update(0.5); if status == DriftStatus::Drift {
1480 break;
1481 }
1482 }
1483 }
1484
1485 #[test]
1486 fn test_adwin_detector() {
1487 let mut detector = AdwinDetector::new(0.005f64, 100);
1488
1489 for i in 0..20 {
1491 let value = 0.1 + (i as f64) * 0.001; detector.update(value);
1493 }
1494
1495 for i in 0..10 {
1497 let value = 0.5 + (i as f64) * 0.01; let status = detector.update(value);
1499 if status == DriftStatus::Drift {
1500 break;
1501 }
1502 }
1503 }
1504
1505 #[test]
1506 fn test_ddm_detector() {
1507 let mut detector = DdmDetector::<f64>::new();
1508
1509 for i in 0..50 {
1511 let iserror = i % 10 == 0; detector.update(iserror);
1513 }
1514
1515 for i in 0..20 {
1517 let iserror = i % 2 == 0; let status = detector.update(iserror);
1519 if status == DriftStatus::Drift {
1520 break;
1521 }
1522 }
1523 }
1524
1525 #[test]
1526 fn test_concept_drift_detector() {
1527 let config = DriftDetectorConfig::default();
1528 let mut detector = ConceptDriftDetector::new(config);
1529
1530 for i in 0..30 {
1532 let loss = 0.1 + (i as f64) * 0.001;
1533 let iserror = i % 10 == 0;
1534 let status = detector.update(loss, iserror).expect("unwrap failed");
1535 assert_ne!(status, DriftStatus::Drift); }
1537
1538 for i in 0..20 {
1540 let loss = 0.5 + (i as f64) * 0.01; let iserror = i % 2 == 0; let _status = detector.update(loss, iserror).expect("unwrap failed");
1543 }
1544
1545 let stats = detector.get_statistics();
1546 assert!(stats.total_drifts > 0 || stats.recent_drift_rate > 0.0);
1547 }
1548
1549 #[test]
1550 fn test_drift_event() {
1551 let event = DriftEvent {
1552 timestamp: Instant::now(),
1553 confidence: 0.85f64,
1554 drift_type: DriftType::Sudden,
1555 adaptation_recommendation: AdaptationRecommendation::Reset,
1556 };
1557
1558 assert_eq!(event.drift_type, DriftType::Sudden);
1559 assert!(event.confidence > 0.8);
1560 }
1561}