1use serde::{Deserialize, Serialize};
16use std::collections::{HashMap, VecDeque};
17use std::sync::Arc;
18use std::time::{Duration, Instant, SystemTime};
19use tokio::sync::RwLock;
20
21use crate::error::StreamError;
22
23#[derive(Debug, Clone, Serialize, Deserialize)]
25pub struct AnomalyConfig {
26 pub initial_threshold: f64,
28 pub window_size: usize,
30 pub adaptation_rate: f64,
32 pub warmup_samples: usize,
34 pub seasonal_detection: bool,
36 pub seasonal_period: Option<usize>,
38 pub use_ensemble: bool,
40 pub contamination: f64,
42 pub alert_cooldown: Duration,
44 pub max_alerts_per_period: usize,
46}
47
48impl Default for AnomalyConfig {
49 fn default() -> Self {
50 Self {
51 initial_threshold: 3.0,
52 window_size: 1000,
53 adaptation_rate: 0.01,
54 warmup_samples: 100,
55 seasonal_detection: false,
56 seasonal_period: None,
57 use_ensemble: true,
58 contamination: 0.01,
59 alert_cooldown: Duration::from_secs(60),
60 max_alerts_per_period: 10,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
67pub enum DetectorType {
68 ZScore,
70 ModifiedZScore,
72 IQR,
74 EWMA,
76 IsolationForest,
78 LOF,
80 OneClassSVM,
82 SeasonalHybridESD,
84 CUSUM,
86}
87
88#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
90pub enum AnomalySeverity {
91 Low,
93 Medium,
95 High,
97 Critical,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Anomaly {
104 pub id: String,
106 pub timestamp: SystemTime,
108 pub value: f64,
110 pub expected: f64,
112 pub score: f64,
114 pub severity: AnomalySeverity,
116 pub detector: DetectorType,
118 pub context: HashMap<String, String>,
120 pub feature_index: Option<usize>,
122}
123
124#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct AnomalyAlert {
127 pub alert_id: String,
129 pub anomaly: Anomaly,
131 pub timestamp: SystemTime,
133 pub message: String,
135 pub acknowledged: bool,
137 pub action: Option<String>,
139}
140
141#[derive(Debug, Clone, Default, Serialize, Deserialize)]
143pub struct AnomalyStats {
144 pub total_samples: u64,
146 pub total_anomalies: u64,
148 pub by_severity: HashMap<String, u64>,
150 pub current_threshold: f64,
152 pub current_mean: f64,
154 pub current_std: f64,
156 pub detection_rate: f64,
158 pub false_positive_estimate: f64,
160 pub avg_anomaly_score: f64,
162 pub alerts_generated: u64,
164}
165
166#[derive(Debug, Clone)]
168struct RunningStats {
169 count: u64,
171 mean: f64,
173 m2: f64,
175 min: f64,
177 max: f64,
179 recent_values: VecDeque<f64>,
181 sorted_values: Vec<f64>,
183 needs_sort: bool,
185}
186
187impl RunningStats {
188 fn new(capacity: usize) -> Self {
189 Self {
190 count: 0,
191 mean: 0.0,
192 m2: 0.0,
193 min: f64::MAX,
194 max: f64::MIN,
195 recent_values: VecDeque::with_capacity(capacity),
196 sorted_values: Vec::with_capacity(capacity),
197 needs_sort: true,
198 }
199 }
200
201 fn update(&mut self, value: f64, window_size: usize) {
202 self.count += 1;
203
204 let delta = value - self.mean;
206 self.mean += delta / self.count as f64;
207 let delta2 = value - self.mean;
208 self.m2 += delta * delta2;
209
210 self.min = self.min.min(value);
212 self.max = self.max.max(value);
213
214 self.recent_values.push_back(value);
216 if self.recent_values.len() > window_size {
217 self.recent_values.pop_front();
218 }
219
220 self.needs_sort = true;
221 }
222
223 fn variance(&self) -> f64 {
224 if self.count < 2 {
225 0.0
226 } else {
227 self.m2 / (self.count - 1) as f64
228 }
229 }
230
231 fn std(&self) -> f64 {
232 self.variance().sqrt()
233 }
234
235 fn percentile(&mut self, p: f64) -> f64 {
236 if self.recent_values.is_empty() {
237 return 0.0;
238 }
239
240 if self.needs_sort {
241 self.sorted_values = self.recent_values.iter().copied().collect();
242 self.sorted_values.sort_by(|a, b| {
243 a.partial_cmp(b)
244 .expect("anomaly detection values should not be NaN")
245 });
246 self.needs_sort = false;
247 }
248
249 let idx = ((self.sorted_values.len() as f64 - 1.0) * p / 100.0) as usize;
250 self.sorted_values[idx.min(self.sorted_values.len() - 1)]
251 }
252
253 fn median(&mut self) -> f64 {
254 self.percentile(50.0)
255 }
256
257 fn mad(&mut self) -> f64 {
258 let median = self.median();
260 let mut abs_deviations: Vec<f64> = self
261 .recent_values
262 .iter()
263 .map(|&x| (x - median).abs())
264 .collect();
265 abs_deviations.sort_by(|a, b| {
266 a.partial_cmp(b)
267 .expect("absolute deviations should not be NaN")
268 });
269
270 if abs_deviations.is_empty() {
271 0.0
272 } else {
273 let mid = abs_deviations.len() / 2;
274 abs_deviations[mid]
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
281struct EWMAState {
282 smoothed_mean: f64,
284 smoothed_var: f64,
286 alpha: f64,
288 initialized: bool,
290}
291
292impl EWMAState {
293 fn new(alpha: f64) -> Self {
294 Self {
295 smoothed_mean: 0.0,
296 smoothed_var: 0.0,
297 alpha,
298 initialized: false,
299 }
300 }
301
302 fn update(&mut self, value: f64) {
303 if !self.initialized {
304 self.smoothed_mean = value;
305 self.smoothed_var = 0.0;
306 self.initialized = true;
307 } else {
308 let error = value - self.smoothed_mean;
309 self.smoothed_mean += self.alpha * error;
310 self.smoothed_var = (1.0 - self.alpha) * self.smoothed_var + self.alpha * error * error;
312 }
313 }
314
315 fn std(&self) -> f64 {
316 self.smoothed_var.sqrt()
317 }
318}
319
320#[derive(Debug, Clone)]
322struct CUSUMState {
323 s_pos: f64,
325 s_neg: f64,
327 target: f64,
329 slack: f64,
331 threshold: f64,
333}
334
335impl CUSUMState {
336 fn new(target: f64, slack: f64, threshold: f64) -> Self {
337 Self {
338 s_pos: 0.0,
339 s_neg: 0.0,
340 target,
341 slack,
342 threshold,
343 }
344 }
345
346 fn update(&mut self, value: f64) -> bool {
347 let z = value - self.target;
348
349 self.s_pos = (self.s_pos + z - self.slack).max(0.0);
350 self.s_neg = (self.s_neg - z - self.slack).max(0.0);
351
352 let is_anomaly = self.s_pos > self.threshold || self.s_neg > self.threshold;
353
354 if is_anomaly {
355 self.s_pos = 0.0;
356 self.s_neg = 0.0;
357 }
358
359 is_anomaly
360 }
361}
362
363pub struct AnomalyDetector {
365 config: AnomalyConfig,
367 stats: Arc<RwLock<RunningStats>>,
369 ewma: Arc<RwLock<EWMAState>>,
371 cusum: Arc<RwLock<CUSUMState>>,
373 threshold: Arc<RwLock<f64>>,
375 anomaly_history: Arc<RwLock<VecDeque<Anomaly>>>,
377 alert_history: Arc<RwLock<VecDeque<AnomalyAlert>>>,
379 detection_stats: Arc<RwLock<AnomalyStats>>,
381 last_alert_time: Arc<RwLock<Instant>>,
383 alerts_in_period: Arc<RwLock<usize>>,
385 recent_scores: Arc<RwLock<VecDeque<f64>>>,
387 seasonal_component: Arc<RwLock<Vec<f64>>>,
389}
390
391impl AnomalyDetector {
392 pub fn new(config: AnomalyConfig) -> Self {
394 let threshold = config.initial_threshold;
395
396 Self {
397 config: config.clone(),
398 stats: Arc::new(RwLock::new(RunningStats::new(config.window_size))),
399 ewma: Arc::new(RwLock::new(EWMAState::new(0.3))),
400 cusum: Arc::new(RwLock::new(CUSUMState::new(0.0, 0.5, 5.0))),
401 threshold: Arc::new(RwLock::new(threshold)),
402 anomaly_history: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
403 alert_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
404 detection_stats: Arc::new(RwLock::new(AnomalyStats::default())),
405 last_alert_time: Arc::new(RwLock::new(Instant::now())),
406 alerts_in_period: Arc::new(RwLock::new(0)),
407 recent_scores: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
408 seasonal_component: Arc::new(RwLock::new(Vec::new())),
409 }
410 }
411
412 pub async fn detect(&self, value: f64) -> Result<Option<Anomaly>, StreamError> {
414 let mut stats = self.stats.write().await;
416 stats.update(value, self.config.window_size);
417
418 let count = stats.count;
419 let mean = stats.mean;
420 let std = stats.std();
421
422 drop(stats);
423
424 {
426 let mut ewma = self.ewma.write().await;
427 ewma.update(value);
428 }
429
430 if count < self.config.warmup_samples as u64 {
432 self.update_detection_stats(value, None).await;
433 return Ok(None);
434 }
435
436 let anomaly = if self.config.use_ensemble {
438 self.ensemble_detect(value, mean, std).await?
439 } else {
440 self.single_detect(value, mean, std, DetectorType::ZScore)
441 .await?
442 };
443
444 if let Some(ref anom) = anomaly {
446 self.adapt_threshold(anom.score).await;
447 }
448
449 self.update_detection_stats(value, anomaly.as_ref()).await;
451
452 if let Some(ref anom) = anomaly {
454 self.maybe_generate_alert(anom).await?;
455 }
456
457 Ok(anomaly)
458 }
459
460 pub async fn detect_batch(&self, values: &[f64]) -> Result<Vec<Option<Anomaly>>, StreamError> {
462 let mut results = Vec::with_capacity(values.len());
463
464 for &value in values {
465 let anomaly = self.detect(value).await?;
466 results.push(anomaly);
467 }
468
469 Ok(results)
470 }
471
472 pub async fn detect_with(
474 &self,
475 value: f64,
476 detector_type: DetectorType,
477 ) -> Result<Option<Anomaly>, StreamError> {
478 let mut stats = self.stats.write().await;
479 stats.update(value, self.config.window_size);
480
481 let count = stats.count;
482 let mean = stats.mean;
483 let std = stats.std();
484
485 drop(stats);
486
487 {
489 let mut ewma = self.ewma.write().await;
490 ewma.update(value);
491 }
492
493 if count < self.config.warmup_samples as u64 {
494 return Ok(None);
495 }
496
497 self.single_detect(value, mean, std, detector_type).await
498 }
499
500 pub async fn get_threshold(&self) -> f64 {
502 *self.threshold.read().await
503 }
504
505 pub async fn set_threshold(&self, threshold: f64) {
507 *self.threshold.write().await = threshold;
508 }
509
510 pub async fn get_stats(&self) -> AnomalyStats {
512 self.detection_stats.read().await.clone()
513 }
514
515 pub async fn get_anomalies(&self, limit: usize) -> Vec<Anomaly> {
517 let history = self.anomaly_history.read().await;
518 history.iter().rev().take(limit).cloned().collect()
519 }
520
521 pub async fn get_alerts(&self, limit: usize) -> Vec<AnomalyAlert> {
523 let history = self.alert_history.read().await;
524 history.iter().rev().take(limit).cloned().collect()
525 }
526
527 pub async fn acknowledge_alert(&self, alert_id: &str) -> Result<(), StreamError> {
529 let mut history = self.alert_history.write().await;
530
531 for alert in history.iter_mut() {
532 if alert.alert_id == alert_id {
533 alert.acknowledged = true;
534 return Ok(());
535 }
536 }
537
538 Err(StreamError::NotFound(format!(
539 "Alert not found: {}",
540 alert_id
541 )))
542 }
543
544 pub async fn reset(&self) {
546 *self.stats.write().await = RunningStats::new(self.config.window_size);
547 *self.ewma.write().await = EWMAState::new(0.3);
548 *self.threshold.write().await = self.config.initial_threshold;
549 self.anomaly_history.write().await.clear();
550 self.recent_scores.write().await.clear();
551 *self.detection_stats.write().await = AnomalyStats::default();
552 }
553
554 pub async fn set_seasonal_period(&self, period: usize) {
556 let mut seasonal = self.seasonal_component.write().await;
557 *seasonal = vec![0.0; period];
558 }
559
560 async fn single_detect(
563 &self,
564 value: f64,
565 mean: f64,
566 std: f64,
567 detector_type: DetectorType,
568 ) -> Result<Option<Anomaly>, StreamError> {
569 let threshold = *self.threshold.read().await;
570
571 let (is_anomaly, score, expected) = match detector_type {
572 DetectorType::ZScore => {
573 let z_score = if std > 0.0 {
574 (value - mean).abs() / std
575 } else {
576 0.0
577 };
578 (z_score > threshold, z_score, mean)
579 }
580 DetectorType::ModifiedZScore => {
581 let mut stats = self.stats.write().await;
582 let median = stats.median();
583 let mad = stats.mad();
584 drop(stats);
585
586 let modified_z = if mad > 0.0 {
587 0.6745 * (value - median).abs() / mad
588 } else {
589 0.0
590 };
591 (modified_z > threshold, modified_z, median)
592 }
593 DetectorType::IQR => {
594 let mut stats = self.stats.write().await;
595 let q1 = stats.percentile(25.0);
596 let q3 = stats.percentile(75.0);
597 drop(stats);
598
599 let iqr = q3 - q1;
600 let lower = q1 - 1.5 * iqr;
601 let upper = q3 + 1.5 * iqr;
602
603 let is_outlier = value < lower || value > upper;
604 let score = if is_outlier {
605 if value < lower {
606 (lower - value) / iqr.max(0.001)
607 } else {
608 (value - upper) / iqr.max(0.001)
609 }
610 } else {
611 0.0
612 };
613 (is_outlier, score, (q1 + q3) / 2.0)
614 }
615 DetectorType::EWMA => {
616 let ewma = self.ewma.read().await;
618 let ewma_mean = ewma.smoothed_mean;
619 let ewma_std = ewma.std();
620 drop(ewma);
621
622 let score = if ewma_std > 0.0 {
624 (value - ewma_mean).abs() / ewma_std
625 } else {
626 0.0
627 };
628
629 (score > threshold, score, ewma_mean)
630 }
631 DetectorType::CUSUM => {
632 {
634 let mut cusum = self.cusum.write().await;
635 if cusum.target == 0.0 {
636 cusum.target = mean;
637 }
638 }
639
640 let mut cusum = self.cusum.write().await;
641 let is_change = cusum.update(value);
642 drop(cusum);
643
644 let score = if is_change { threshold + 1.0 } else { 0.0 };
645 (is_change, score, mean)
646 }
647 _ => {
648 let z_score = if std > 0.0 {
650 (value - mean).abs() / std
651 } else {
652 0.0
653 };
654 (z_score > threshold, z_score, mean)
655 }
656 };
657
658 if is_anomaly {
659 let severity = self.calculate_severity(score, threshold);
660
661 Ok(Some(Anomaly {
662 id: uuid::Uuid::new_v4().to_string(),
663 timestamp: SystemTime::now(),
664 value,
665 expected,
666 score,
667 severity,
668 detector: detector_type,
669 context: HashMap::new(),
670 feature_index: None,
671 }))
672 } else {
673 Ok(None)
674 }
675 }
676
677 async fn ensemble_detect(
678 &self,
679 value: f64,
680 mean: f64,
681 std: f64,
682 ) -> Result<Option<Anomaly>, StreamError> {
683 let detectors = vec![
684 DetectorType::ZScore,
685 DetectorType::ModifiedZScore,
686 DetectorType::EWMA,
687 DetectorType::IQR,
688 ];
689
690 let mut votes = 0;
691 let mut total_score = 0.0;
692 let mut best_anomaly: Option<Anomaly> = None;
693 let mut max_score = 0.0;
694
695 for detector in detectors {
696 if let Some(anomaly) = self.single_detect(value, mean, std, detector).await? {
697 votes += 1;
698 total_score += anomaly.score;
699
700 if anomaly.score > max_score {
701 max_score = anomaly.score;
702 best_anomaly = Some(anomaly);
703 }
704 }
705 }
706
707 if votes >= 2 {
709 if let Some(mut anomaly) = best_anomaly {
710 anomaly.score = total_score / votes as f64;
711 anomaly
712 .context
713 .insert("votes".to_string(), votes.to_string());
714 anomaly
715 .context
716 .insert("detector".to_string(), "ensemble".to_string());
717 return Ok(Some(anomaly));
718 }
719 }
720
721 Ok(None)
722 }
723
724 fn calculate_severity(&self, score: f64, threshold: f64) -> AnomalySeverity {
725 let ratio = score / threshold;
726
727 if ratio > 3.0 {
728 AnomalySeverity::Critical
729 } else if ratio > 2.0 {
730 AnomalySeverity::High
731 } else if ratio > 1.5 {
732 AnomalySeverity::Medium
733 } else {
734 AnomalySeverity::Low
735 }
736 }
737
738 async fn adapt_threshold(&self, score: f64) {
739 let mut recent_scores = self.recent_scores.write().await;
740 recent_scores.push_back(score);
741
742 if recent_scores.len() > 1000 {
743 recent_scores.pop_front();
744 }
745
746 if recent_scores.len() >= 100 {
748 let mut threshold = self.threshold.write().await;
749
750 let mut sorted: Vec<f64> = recent_scores.iter().copied().collect();
752 sorted.sort_by(|a, b| a.partial_cmp(b).expect("anomaly scores should not be NaN"));
753
754 let idx = ((1.0 - self.config.contamination) * sorted.len() as f64) as usize;
756 let target_threshold = sorted[idx.min(sorted.len() - 1)];
757
758 *threshold += self.config.adaptation_rate * (target_threshold - *threshold);
760 }
761 }
762
763 async fn update_detection_stats(&self, _value: f64, anomaly: Option<&Anomaly>) {
764 let mut stats = self.detection_stats.write().await;
765 stats.total_samples += 1;
766
767 if let Some(anom) = anomaly {
768 stats.total_anomalies += 1;
769
770 let severity_key = format!("{:?}", anom.severity);
771 *stats.by_severity.entry(severity_key).or_insert(0) += 1;
772
773 let n = stats.total_anomalies as f64;
775 stats.avg_anomaly_score = stats.avg_anomaly_score * (n - 1.0) / n + anom.score / n;
776
777 let mut history = self.anomaly_history.write().await;
779 history.push_back(anom.clone());
780
781 if history.len() > 1000 {
782 history.pop_front();
783 }
784 }
785
786 if stats.total_samples > 0 {
788 stats.detection_rate = stats.total_anomalies as f64 / stats.total_samples as f64;
789 }
790
791 let running_stats = self.stats.read().await;
793 stats.current_mean = running_stats.mean;
794 stats.current_std = running_stats.std();
795 drop(running_stats);
796
797 stats.current_threshold = *self.threshold.read().await;
798 }
799
800 async fn maybe_generate_alert(&self, anomaly: &Anomaly) -> Result<(), StreamError> {
801 let last_alert = *self.last_alert_time.read().await;
803 if last_alert.elapsed() < self.config.alert_cooldown {
804 return Ok(());
805 }
806
807 let alerts = *self.alerts_in_period.read().await;
809 if alerts >= self.config.max_alerts_per_period {
810 return Ok(());
811 }
812
813 if anomaly.severity < AnomalySeverity::Medium {
815 return Ok(());
816 }
817
818 let alert = AnomalyAlert {
819 alert_id: uuid::Uuid::new_v4().to_string(),
820 anomaly: anomaly.clone(),
821 timestamp: SystemTime::now(),
822 message: format!(
823 "Anomaly detected: value={:.2}, expected={:.2}, score={:.2}, severity={:?}",
824 anomaly.value, anomaly.expected, anomaly.score, anomaly.severity
825 ),
826 acknowledged: false,
827 action: None,
828 };
829
830 let mut history = self.alert_history.write().await;
832 history.push_back(alert);
833
834 if history.len() > 100 {
835 history.pop_front();
836 }
837
838 *self.last_alert_time.write().await = Instant::now();
840 *self.alerts_in_period.write().await += 1;
841
842 let mut stats = self.detection_stats.write().await;
843 stats.alerts_generated += 1;
844
845 Ok(())
846 }
847}
848
849pub struct MultiDimensionalDetector {
851 detectors: Vec<AnomalyDetector>,
853 correlations: Arc<RwLock<Vec<Vec<f64>>>>,
855 mean_vector: Arc<RwLock<Vec<f64>>>,
857 inv_cov: Arc<RwLock<Vec<Vec<f64>>>>,
859 sample_count: Arc<RwLock<u64>>,
861}
862
863impl MultiDimensionalDetector {
864 pub fn new(dimensions: usize, config: AnomalyConfig) -> Self {
866 let detectors = (0..dimensions)
867 .map(|_| AnomalyDetector::new(config.clone()))
868 .collect();
869
870 Self {
871 detectors,
872 correlations: Arc::new(RwLock::new(vec![vec![0.0; dimensions]; dimensions])),
873 mean_vector: Arc::new(RwLock::new(vec![0.0; dimensions])),
874 inv_cov: Arc::new(RwLock::new(vec![vec![0.0; dimensions]; dimensions])),
875 sample_count: Arc::new(RwLock::new(0)),
876 }
877 }
878
879 pub async fn detect(&self, values: &[f64]) -> Result<Vec<Option<Anomaly>>, StreamError> {
881 if values.len() != self.detectors.len() {
882 return Err(StreamError::InvalidInput(format!(
883 "Expected {} dimensions, got {}",
884 self.detectors.len(),
885 values.len()
886 )));
887 }
888
889 let mut mean = self.mean_vector.write().await;
891 let mut count = self.sample_count.write().await;
892 *count += 1;
893
894 for (i, &v) in values.iter().enumerate() {
895 let delta = v - mean[i];
896 mean[i] += delta / *count as f64;
897 }
898
899 drop(mean);
900 drop(count);
901
902 let mut results = Vec::with_capacity(values.len());
904
905 for (i, (&value, detector)) in values.iter().zip(&self.detectors).enumerate() {
906 let mut anomaly = detector.detect(value).await?;
907
908 if let Some(ref mut anom) = anomaly {
910 anom.feature_index = Some(i);
911 }
912
913 results.push(anomaly);
914 }
915
916 Ok(results)
917 }
918
919 pub async fn mahalanobis_score(&self, values: &[f64]) -> f64 {
921 let mean = self.mean_vector.read().await;
922
923 if values.len() != mean.len() {
924 return 0.0;
925 }
926
927 let mut score = 0.0;
929 for (i, &v) in values.iter().enumerate() {
930 let diff = v - mean[i];
931 if let Ok(stats) = self.get_dimension_stats(i).await {
933 let var = stats.current_std.powi(2).max(0.001);
934 score += diff * diff / var;
935 }
936 }
937
938 score.sqrt()
939 }
940
941 pub async fn get_dimension_stats(&self, dimension: usize) -> Result<AnomalyStats, StreamError> {
943 if dimension >= self.detectors.len() {
944 return Err(StreamError::InvalidInput(format!(
945 "Dimension {} out of range",
946 dimension
947 )));
948 }
949
950 Ok(self.detectors[dimension].get_stats().await)
951 }
952}
953
954#[cfg(test)]
955mod tests {
956 use super::*;
957
958 #[tokio::test]
959 async fn test_zscore_detection() {
960 let config = AnomalyConfig {
961 warmup_samples: 10,
962 initial_threshold: 2.0,
963 ..Default::default()
964 };
965
966 let detector = AnomalyDetector::new(config);
967
968 for i in 0..100 {
970 let value = 50.0 + (i as f64 % 10.0) - 5.0;
971 detector.detect(value).await.unwrap();
972 }
973
974 let result = detector.detect(1000.0).await.unwrap();
976 assert!(result.is_some());
977
978 let anomaly = result.unwrap();
979 assert!(anomaly.score > 2.0);
980 }
981
982 #[tokio::test]
983 async fn test_modified_zscore() {
984 let config = AnomalyConfig {
985 warmup_samples: 10,
986 use_ensemble: false,
987 ..Default::default()
988 };
989
990 let detector = AnomalyDetector::new(config);
991
992 for i in 0..50 {
994 let value = 10.0 + (i % 5) as f64; detector.detect(value).await.unwrap();
996 }
997
998 let result = detector
1000 .detect_with(100.0, DetectorType::ModifiedZScore)
1001 .await
1002 .unwrap();
1003
1004 assert!(result.is_some());
1005 }
1006
1007 #[tokio::test]
1008 async fn test_iqr_detection() {
1009 let config = AnomalyConfig {
1010 warmup_samples: 10,
1011 use_ensemble: false,
1012 ..Default::default()
1013 };
1014
1015 let detector = AnomalyDetector::new(config);
1016
1017 for i in 0..100 {
1019 let value = 50.0 + (i % 20) as f64 - 10.0;
1020 detector.detect(value).await.unwrap();
1021 }
1022
1023 let result = detector
1024 .detect_with(200.0, DetectorType::IQR)
1025 .await
1026 .unwrap();
1027
1028 assert!(result.is_some());
1029 }
1030
1031 #[tokio::test]
1032 async fn test_ewma_detection() {
1033 let config = AnomalyConfig {
1034 warmup_samples: 10,
1035 use_ensemble: true, ..Default::default()
1037 };
1038
1039 let detector = AnomalyDetector::new(config);
1040
1041 for i in 0..100 {
1043 let value = 50.0 + ((i as f64).sin() * 5.0);
1044 detector.detect(value).await.unwrap();
1045 }
1046
1047 let result = detector.detect(200.0).await.unwrap();
1049
1050 assert!(
1051 result.is_some(),
1052 "Ensemble (including EWMA) should detect extreme outlier"
1053 );
1054 }
1055
1056 #[tokio::test]
1057 async fn test_ensemble_detection() {
1058 let config = AnomalyConfig {
1059 warmup_samples: 20,
1060 use_ensemble: true,
1061 ..Default::default()
1062 };
1063
1064 let detector = AnomalyDetector::new(config);
1065
1066 for i in 0..100 {
1068 let value = 50.0 + (i as f64).sin() * 5.0;
1069 detector.detect(value).await.unwrap();
1070 }
1071
1072 let result = detector.detect(500.0).await.unwrap();
1074 assert!(result.is_some());
1075
1076 if let Some(anomaly) = result {
1077 assert!(anomaly.context.contains_key("votes"));
1078 }
1079 }
1080
1081 #[tokio::test]
1082 async fn test_severity_levels() {
1083 let config = AnomalyConfig {
1084 warmup_samples: 10,
1085 initial_threshold: 2.0,
1086 ..Default::default()
1087 };
1088
1089 let detector = AnomalyDetector::new(config);
1090
1091 for i in 0..50 {
1093 let value = 100.0 + (i % 10) as f64; detector.detect(value).await.unwrap();
1095 }
1096
1097 let result = detector.detect(115.0).await.unwrap();
1099 if let Some(anomaly) = result {
1100 assert!(anomaly.severity <= AnomalySeverity::Medium);
1101 }
1102
1103 let result = detector.detect(1000.0).await.unwrap();
1105 assert!(result.is_some());
1106 assert!(result.unwrap().severity >= AnomalySeverity::High);
1107 }
1108
1109 #[tokio::test]
1110 async fn test_adaptive_threshold() {
1111 let config = AnomalyConfig {
1112 warmup_samples: 10,
1113 adaptation_rate: 0.2,
1114 use_ensemble: false,
1115 ..Default::default()
1116 };
1117
1118 let detector = AnomalyDetector::new(config);
1119
1120 let initial_threshold = detector.get_threshold().await;
1122 assert_eq!(initial_threshold, 3.0);
1123
1124 detector.set_threshold(4.0).await;
1126 let new_threshold = detector.get_threshold().await;
1127 assert_eq!(new_threshold, 4.0);
1128
1129 for i in 0..100 {
1131 let value = 50.0 + (i % 20) as f64;
1132 detector.detect(value).await.unwrap();
1133 }
1134
1135 let stats = detector.get_stats().await;
1137 assert_eq!(stats.total_samples, 100);
1138 assert!(stats.current_mean > 0.0);
1139
1140 let result = detector.detect(300.0).await.unwrap();
1142 let _is_anomaly = result.is_some();
1145 }
1146
1147 #[tokio::test]
1148 async fn test_statistics() {
1149 let config = AnomalyConfig {
1150 warmup_samples: 10,
1151 ..Default::default()
1152 };
1153
1154 let detector = AnomalyDetector::new(config);
1155
1156 for i in 0..100 {
1157 detector.detect(i as f64).await.unwrap();
1158 }
1159
1160 let stats = detector.get_stats().await;
1161 assert_eq!(stats.total_samples, 100);
1162 assert!(stats.current_mean > 0.0);
1163 }
1164
1165 #[tokio::test]
1166 async fn test_reset() {
1167 let config = AnomalyConfig::default();
1168 let detector = AnomalyDetector::new(config);
1169
1170 for i in 0..100 {
1171 detector.detect(i as f64).await.unwrap();
1172 }
1173
1174 detector.reset().await;
1175
1176 let stats = detector.get_stats().await;
1177 assert_eq!(stats.total_samples, 0);
1178 }
1179
1180 #[tokio::test]
1181 async fn test_cusum_detection() {
1182 let config = AnomalyConfig {
1183 warmup_samples: 10,
1184 use_ensemble: false,
1185 ..Default::default()
1186 };
1187
1188 let detector = AnomalyDetector::new(config);
1189
1190 for _ in 0..50 {
1192 detector.detect(100.0).await.unwrap();
1193 }
1194
1195 for _ in 0..10 {
1197 let result = detector
1198 .detect_with(200.0, DetectorType::CUSUM)
1199 .await
1200 .unwrap();
1201 if result.is_some() {
1202 return; }
1204 }
1205
1206 }
1208
1209 #[tokio::test]
1210 async fn test_batch_detection() {
1211 let config = AnomalyConfig {
1212 warmup_samples: 10,
1213 ..Default::default()
1214 };
1215
1216 let detector = AnomalyDetector::new(config);
1217
1218 for _ in 0..50 {
1220 detector.detect(100.0).await.unwrap();
1221 }
1222
1223 let values: Vec<f64> = vec![100.0, 101.0, 1000.0, 102.0, 999.0];
1224 let results = detector.detect_batch(&values).await.unwrap();
1225
1226 assert_eq!(results.len(), 5);
1227
1228 let anomaly_count = results.iter().filter(|r| r.is_some()).count();
1230 assert!(anomaly_count >= 1);
1231 }
1232
1233 #[tokio::test]
1234 async fn test_multi_dimensional() {
1235 let config = AnomalyConfig {
1236 warmup_samples: 10,
1237 use_ensemble: false,
1238 ..Default::default()
1239 };
1240
1241 let detector = MultiDimensionalDetector::new(3, config);
1242
1243 for _ in 0..50 {
1245 detector.detect(&[10.0, 20.0, 30.0]).await.unwrap();
1246 }
1247
1248 let results = detector.detect(&[1000.0, 20.0, 30.0]).await.unwrap();
1250
1251 assert!(results[0].is_some());
1252 assert!(results[0].as_ref().unwrap().feature_index == Some(0));
1253 }
1254
1255 #[tokio::test]
1256 async fn test_mahalanobis_score() {
1257 let config = AnomalyConfig {
1258 warmup_samples: 10,
1259 ..Default::default()
1260 };
1261
1262 let detector = MultiDimensionalDetector::new(2, config);
1263
1264 for _ in 0..100 {
1266 detector.detect(&[10.0, 20.0]).await.unwrap();
1267 }
1268
1269 let normal_score = detector.mahalanobis_score(&[10.0, 20.0]).await;
1271
1272 let anomaly_score = detector.mahalanobis_score(&[100.0, 200.0]).await;
1274
1275 assert!(anomaly_score > normal_score);
1276 }
1277
1278 #[tokio::test]
1279 async fn test_alert_generation() {
1280 let config = AnomalyConfig {
1281 warmup_samples: 10,
1282 alert_cooldown: Duration::from_millis(10),
1283 ..Default::default()
1284 };
1285
1286 let detector = AnomalyDetector::new(config);
1287
1288 for _ in 0..50 {
1290 detector.detect(100.0).await.unwrap();
1291 }
1292
1293 detector.detect(10000.0).await.unwrap();
1295
1296 tokio::time::sleep(Duration::from_millis(20)).await;
1298
1299 let alerts = detector.get_alerts(10).await;
1300 assert!(alerts.len() <= 1);
1302 }
1303
1304 #[tokio::test]
1305 async fn test_acknowledge_alert() {
1306 let config = AnomalyConfig {
1307 warmup_samples: 10,
1308 alert_cooldown: Duration::from_millis(1),
1309 ..Default::default()
1310 };
1311
1312 let detector = AnomalyDetector::new(config);
1313
1314 for _ in 0..50 {
1316 detector.detect(100.0).await.unwrap();
1317 }
1318
1319 detector.detect(10000.0).await.unwrap();
1320
1321 tokio::time::sleep(Duration::from_millis(5)).await;
1322
1323 let alerts = detector.get_alerts(10).await;
1324 if !alerts.is_empty() {
1325 detector
1326 .acknowledge_alert(&alerts[0].alert_id)
1327 .await
1328 .unwrap();
1329
1330 let updated_alerts = detector.get_alerts(10).await;
1331 assert!(updated_alerts[0].acknowledged);
1332 }
1333 }
1334}