use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime};
use tokio::sync::RwLock;
use crate::error::StreamError;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyConfig {
pub initial_threshold: f64,
pub window_size: usize,
pub adaptation_rate: f64,
pub warmup_samples: usize,
pub seasonal_detection: bool,
pub seasonal_period: Option<usize>,
pub use_ensemble: bool,
pub contamination: f64,
pub alert_cooldown: Duration,
pub max_alerts_per_period: usize,
}
impl Default for AnomalyConfig {
fn default() -> Self {
Self {
initial_threshold: 3.0,
window_size: 1000,
adaptation_rate: 0.01,
warmup_samples: 100,
seasonal_detection: false,
seasonal_period: None,
use_ensemble: true,
contamination: 0.01,
alert_cooldown: Duration::from_secs(60),
max_alerts_per_period: 10,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum DetectorType {
ZScore,
ModifiedZScore,
IQR,
EWMA,
IsolationForest,
LOF,
OneClassSVM,
SeasonalHybridESD,
CUSUM,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Ord, PartialOrd, Eq)]
pub enum AnomalySeverity {
Low,
Medium,
High,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Anomaly {
pub id: String,
pub timestamp: SystemTime,
pub value: f64,
pub expected: f64,
pub score: f64,
pub severity: AnomalySeverity,
pub detector: DetectorType,
pub context: HashMap<String, String>,
pub feature_index: Option<usize>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AnomalyAlert {
pub alert_id: String,
pub anomaly: Anomaly,
pub timestamp: SystemTime,
pub message: String,
pub acknowledged: bool,
pub action: Option<String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AnomalyStats {
pub total_samples: u64,
pub total_anomalies: u64,
pub by_severity: HashMap<String, u64>,
pub current_threshold: f64,
pub current_mean: f64,
pub current_std: f64,
pub detection_rate: f64,
pub false_positive_estimate: f64,
pub avg_anomaly_score: f64,
pub alerts_generated: u64,
}
#[derive(Debug, Clone)]
struct RunningStats {
count: u64,
mean: f64,
m2: f64,
min: f64,
max: f64,
recent_values: VecDeque<f64>,
sorted_values: Vec<f64>,
needs_sort: bool,
}
impl RunningStats {
fn new(capacity: usize) -> Self {
Self {
count: 0,
mean: 0.0,
m2: 0.0,
min: f64::MAX,
max: f64::MIN,
recent_values: VecDeque::with_capacity(capacity),
sorted_values: Vec::with_capacity(capacity),
needs_sort: true,
}
}
fn update(&mut self, value: f64, window_size: usize) {
self.count += 1;
let delta = value - self.mean;
self.mean += delta / self.count as f64;
let delta2 = value - self.mean;
self.m2 += delta * delta2;
self.min = self.min.min(value);
self.max = self.max.max(value);
self.recent_values.push_back(value);
if self.recent_values.len() > window_size {
self.recent_values.pop_front();
}
self.needs_sort = true;
}
fn variance(&self) -> f64 {
if self.count < 2 {
0.0
} else {
self.m2 / (self.count - 1) as f64
}
}
fn std(&self) -> f64 {
self.variance().sqrt()
}
fn percentile(&mut self, p: f64) -> f64 {
if self.recent_values.is_empty() {
return 0.0;
}
if self.needs_sort {
self.sorted_values = self.recent_values.iter().copied().collect();
self.sorted_values.sort_by(|a, b| {
a.partial_cmp(b)
.expect("anomaly detection values should not be NaN")
});
self.needs_sort = false;
}
let idx = ((self.sorted_values.len() as f64 - 1.0) * p / 100.0) as usize;
self.sorted_values[idx.min(self.sorted_values.len() - 1)]
}
fn median(&mut self) -> f64 {
self.percentile(50.0)
}
fn mad(&mut self) -> f64 {
let median = self.median();
let mut abs_deviations: Vec<f64> = self
.recent_values
.iter()
.map(|&x| (x - median).abs())
.collect();
abs_deviations.sort_by(|a, b| {
a.partial_cmp(b)
.expect("absolute deviations should not be NaN")
});
if abs_deviations.is_empty() {
0.0
} else {
let mid = abs_deviations.len() / 2;
abs_deviations[mid]
}
}
}
#[derive(Debug, Clone)]
struct EWMAState {
smoothed_mean: f64,
smoothed_var: f64,
alpha: f64,
initialized: bool,
}
impl EWMAState {
fn new(alpha: f64) -> Self {
Self {
smoothed_mean: 0.0,
smoothed_var: 0.0,
alpha,
initialized: false,
}
}
fn update(&mut self, value: f64) {
if !self.initialized {
self.smoothed_mean = value;
self.smoothed_var = 0.0;
self.initialized = true;
} else {
let error = value - self.smoothed_mean;
self.smoothed_mean += self.alpha * error;
self.smoothed_var = (1.0 - self.alpha) * self.smoothed_var + self.alpha * error * error;
}
}
fn std(&self) -> f64 {
self.smoothed_var.sqrt()
}
}
#[derive(Debug, Clone)]
struct CUSUMState {
s_pos: f64,
s_neg: f64,
target: f64,
slack: f64,
threshold: f64,
}
impl CUSUMState {
fn new(target: f64, slack: f64, threshold: f64) -> Self {
Self {
s_pos: 0.0,
s_neg: 0.0,
target,
slack,
threshold,
}
}
fn update(&mut self, value: f64) -> bool {
let z = value - self.target;
self.s_pos = (self.s_pos + z - self.slack).max(0.0);
self.s_neg = (self.s_neg - z - self.slack).max(0.0);
let is_anomaly = self.s_pos > self.threshold || self.s_neg > self.threshold;
if is_anomaly {
self.s_pos = 0.0;
self.s_neg = 0.0;
}
is_anomaly
}
}
pub struct AnomalyDetector {
config: AnomalyConfig,
stats: Arc<RwLock<RunningStats>>,
ewma: Arc<RwLock<EWMAState>>,
cusum: Arc<RwLock<CUSUMState>>,
threshold: Arc<RwLock<f64>>,
anomaly_history: Arc<RwLock<VecDeque<Anomaly>>>,
alert_history: Arc<RwLock<VecDeque<AnomalyAlert>>>,
detection_stats: Arc<RwLock<AnomalyStats>>,
last_alert_time: Arc<RwLock<Instant>>,
alerts_in_period: Arc<RwLock<usize>>,
recent_scores: Arc<RwLock<VecDeque<f64>>>,
seasonal_component: Arc<RwLock<Vec<f64>>>,
}
impl AnomalyDetector {
pub fn new(config: AnomalyConfig) -> Self {
let threshold = config.initial_threshold;
Self {
config: config.clone(),
stats: Arc::new(RwLock::new(RunningStats::new(config.window_size))),
ewma: Arc::new(RwLock::new(EWMAState::new(0.3))),
cusum: Arc::new(RwLock::new(CUSUMState::new(0.0, 0.5, 5.0))),
threshold: Arc::new(RwLock::new(threshold)),
anomaly_history: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
alert_history: Arc::new(RwLock::new(VecDeque::with_capacity(100))),
detection_stats: Arc::new(RwLock::new(AnomalyStats::default())),
last_alert_time: Arc::new(RwLock::new(Instant::now())),
alerts_in_period: Arc::new(RwLock::new(0)),
recent_scores: Arc::new(RwLock::new(VecDeque::with_capacity(1000))),
seasonal_component: Arc::new(RwLock::new(Vec::new())),
}
}
pub async fn detect(&self, value: f64) -> Result<Option<Anomaly>, StreamError> {
let mut stats = self.stats.write().await;
stats.update(value, self.config.window_size);
let count = stats.count;
let mean = stats.mean;
let std = stats.std();
drop(stats);
{
let mut ewma = self.ewma.write().await;
ewma.update(value);
}
if count < self.config.warmup_samples as u64 {
self.update_detection_stats(value, None).await;
return Ok(None);
}
let anomaly = if self.config.use_ensemble {
self.ensemble_detect(value, mean, std).await?
} else {
self.single_detect(value, mean, std, DetectorType::ZScore)
.await?
};
if let Some(ref anom) = anomaly {
self.adapt_threshold(anom.score).await;
}
self.update_detection_stats(value, anomaly.as_ref()).await;
if let Some(ref anom) = anomaly {
self.maybe_generate_alert(anom).await?;
}
Ok(anomaly)
}
pub async fn detect_batch(&self, values: &[f64]) -> Result<Vec<Option<Anomaly>>, StreamError> {
let mut results = Vec::with_capacity(values.len());
for &value in values {
let anomaly = self.detect(value).await?;
results.push(anomaly);
}
Ok(results)
}
pub async fn detect_with(
&self,
value: f64,
detector_type: DetectorType,
) -> Result<Option<Anomaly>, StreamError> {
let mut stats = self.stats.write().await;
stats.update(value, self.config.window_size);
let count = stats.count;
let mean = stats.mean;
let std = stats.std();
drop(stats);
{
let mut ewma = self.ewma.write().await;
ewma.update(value);
}
if count < self.config.warmup_samples as u64 {
return Ok(None);
}
self.single_detect(value, mean, std, detector_type).await
}
pub async fn get_threshold(&self) -> f64 {
*self.threshold.read().await
}
pub async fn set_threshold(&self, threshold: f64) {
*self.threshold.write().await = threshold;
}
pub async fn get_stats(&self) -> AnomalyStats {
self.detection_stats.read().await.clone()
}
pub async fn get_anomalies(&self, limit: usize) -> Vec<Anomaly> {
let history = self.anomaly_history.read().await;
history.iter().rev().take(limit).cloned().collect()
}
pub async fn get_alerts(&self, limit: usize) -> Vec<AnomalyAlert> {
let history = self.alert_history.read().await;
history.iter().rev().take(limit).cloned().collect()
}
pub async fn acknowledge_alert(&self, alert_id: &str) -> Result<(), StreamError> {
let mut history = self.alert_history.write().await;
for alert in history.iter_mut() {
if alert.alert_id == alert_id {
alert.acknowledged = true;
return Ok(());
}
}
Err(StreamError::NotFound(format!(
"Alert not found: {}",
alert_id
)))
}
pub async fn reset(&self) {
*self.stats.write().await = RunningStats::new(self.config.window_size);
*self.ewma.write().await = EWMAState::new(0.3);
*self.threshold.write().await = self.config.initial_threshold;
self.anomaly_history.write().await.clear();
self.recent_scores.write().await.clear();
*self.detection_stats.write().await = AnomalyStats::default();
}
pub async fn set_seasonal_period(&self, period: usize) {
let mut seasonal = self.seasonal_component.write().await;
*seasonal = vec![0.0; period];
}
async fn single_detect(
&self,
value: f64,
mean: f64,
std: f64,
detector_type: DetectorType,
) -> Result<Option<Anomaly>, StreamError> {
let threshold = *self.threshold.read().await;
let (is_anomaly, score, expected) = match detector_type {
DetectorType::ZScore => {
let z_score = if std > 0.0 {
(value - mean).abs() / std
} else {
0.0
};
(z_score > threshold, z_score, mean)
}
DetectorType::ModifiedZScore => {
let mut stats = self.stats.write().await;
let median = stats.median();
let mad = stats.mad();
drop(stats);
let modified_z = if mad > 0.0 {
0.6745 * (value - median).abs() / mad
} else {
0.0
};
(modified_z > threshold, modified_z, median)
}
DetectorType::IQR => {
let mut stats = self.stats.write().await;
let q1 = stats.percentile(25.0);
let q3 = stats.percentile(75.0);
drop(stats);
let iqr = q3 - q1;
let lower = q1 - 1.5 * iqr;
let upper = q3 + 1.5 * iqr;
let is_outlier = value < lower || value > upper;
let score = if is_outlier {
if value < lower {
(lower - value) / iqr.max(0.001)
} else {
(value - upper) / iqr.max(0.001)
}
} else {
0.0
};
(is_outlier, score, (q1 + q3) / 2.0)
}
DetectorType::EWMA => {
let ewma = self.ewma.read().await;
let ewma_mean = ewma.smoothed_mean;
let ewma_std = ewma.std();
drop(ewma);
let score = if ewma_std > 0.0 {
(value - ewma_mean).abs() / ewma_std
} else {
0.0
};
(score > threshold, score, ewma_mean)
}
DetectorType::CUSUM => {
{
let mut cusum = self.cusum.write().await;
if cusum.target == 0.0 {
cusum.target = mean;
}
}
let mut cusum = self.cusum.write().await;
let is_change = cusum.update(value);
drop(cusum);
let score = if is_change { threshold + 1.0 } else { 0.0 };
(is_change, score, mean)
}
_ => {
let z_score = if std > 0.0 {
(value - mean).abs() / std
} else {
0.0
};
(z_score > threshold, z_score, mean)
}
};
if is_anomaly {
let severity = self.calculate_severity(score, threshold);
Ok(Some(Anomaly {
id: uuid::Uuid::new_v4().to_string(),
timestamp: SystemTime::now(),
value,
expected,
score,
severity,
detector: detector_type,
context: HashMap::new(),
feature_index: None,
}))
} else {
Ok(None)
}
}
async fn ensemble_detect(
&self,
value: f64,
mean: f64,
std: f64,
) -> Result<Option<Anomaly>, StreamError> {
let detectors = vec![
DetectorType::ZScore,
DetectorType::ModifiedZScore,
DetectorType::EWMA,
DetectorType::IQR,
];
let mut votes = 0;
let mut total_score = 0.0;
let mut best_anomaly: Option<Anomaly> = None;
let mut max_score = 0.0;
for detector in detectors {
if let Some(anomaly) = self.single_detect(value, mean, std, detector).await? {
votes += 1;
total_score += anomaly.score;
if anomaly.score > max_score {
max_score = anomaly.score;
best_anomaly = Some(anomaly);
}
}
}
if votes >= 2 {
if let Some(mut anomaly) = best_anomaly {
anomaly.score = total_score / votes as f64;
anomaly
.context
.insert("votes".to_string(), votes.to_string());
anomaly
.context
.insert("detector".to_string(), "ensemble".to_string());
return Ok(Some(anomaly));
}
}
Ok(None)
}
fn calculate_severity(&self, score: f64, threshold: f64) -> AnomalySeverity {
let ratio = score / threshold;
if ratio > 3.0 {
AnomalySeverity::Critical
} else if ratio > 2.0 {
AnomalySeverity::High
} else if ratio > 1.5 {
AnomalySeverity::Medium
} else {
AnomalySeverity::Low
}
}
async fn adapt_threshold(&self, score: f64) {
let mut recent_scores = self.recent_scores.write().await;
recent_scores.push_back(score);
if recent_scores.len() > 1000 {
recent_scores.pop_front();
}
if recent_scores.len() >= 100 {
let mut threshold = self.threshold.write().await;
let mut sorted: Vec<f64> = recent_scores.iter().copied().collect();
sorted.sort_by(|a, b| a.partial_cmp(b).expect("anomaly scores should not be NaN"));
let idx = ((1.0 - self.config.contamination) * sorted.len() as f64) as usize;
let target_threshold = sorted[idx.min(sorted.len() - 1)];
*threshold += self.config.adaptation_rate * (target_threshold - *threshold);
}
}
async fn update_detection_stats(&self, _value: f64, anomaly: Option<&Anomaly>) {
let mut stats = self.detection_stats.write().await;
stats.total_samples += 1;
if let Some(anom) = anomaly {
stats.total_anomalies += 1;
let severity_key = format!("{:?}", anom.severity);
*stats.by_severity.entry(severity_key).or_insert(0) += 1;
let n = stats.total_anomalies as f64;
stats.avg_anomaly_score = stats.avg_anomaly_score * (n - 1.0) / n + anom.score / n;
let mut history = self.anomaly_history.write().await;
history.push_back(anom.clone());
if history.len() > 1000 {
history.pop_front();
}
}
if stats.total_samples > 0 {
stats.detection_rate = stats.total_anomalies as f64 / stats.total_samples as f64;
}
let running_stats = self.stats.read().await;
stats.current_mean = running_stats.mean;
stats.current_std = running_stats.std();
drop(running_stats);
stats.current_threshold = *self.threshold.read().await;
}
async fn maybe_generate_alert(&self, anomaly: &Anomaly) -> Result<(), StreamError> {
let last_alert = *self.last_alert_time.read().await;
if last_alert.elapsed() < self.config.alert_cooldown {
return Ok(());
}
let alerts = *self.alerts_in_period.read().await;
if alerts >= self.config.max_alerts_per_period {
return Ok(());
}
if anomaly.severity < AnomalySeverity::Medium {
return Ok(());
}
let alert = AnomalyAlert {
alert_id: uuid::Uuid::new_v4().to_string(),
anomaly: anomaly.clone(),
timestamp: SystemTime::now(),
message: format!(
"Anomaly detected: value={:.2}, expected={:.2}, score={:.2}, severity={:?}",
anomaly.value, anomaly.expected, anomaly.score, anomaly.severity
),
acknowledged: false,
action: None,
};
let mut history = self.alert_history.write().await;
history.push_back(alert);
if history.len() > 100 {
history.pop_front();
}
*self.last_alert_time.write().await = Instant::now();
*self.alerts_in_period.write().await += 1;
let mut stats = self.detection_stats.write().await;
stats.alerts_generated += 1;
Ok(())
}
}
pub struct MultiDimensionalDetector {
detectors: Vec<AnomalyDetector>,
correlations: Arc<RwLock<Vec<Vec<f64>>>>,
mean_vector: Arc<RwLock<Vec<f64>>>,
inv_cov: Arc<RwLock<Vec<Vec<f64>>>>,
sample_count: Arc<RwLock<u64>>,
}
impl MultiDimensionalDetector {
pub fn new(dimensions: usize, config: AnomalyConfig) -> Self {
let detectors = (0..dimensions)
.map(|_| AnomalyDetector::new(config.clone()))
.collect();
Self {
detectors,
correlations: Arc::new(RwLock::new(vec![vec![0.0; dimensions]; dimensions])),
mean_vector: Arc::new(RwLock::new(vec![0.0; dimensions])),
inv_cov: Arc::new(RwLock::new(vec![vec![0.0; dimensions]; dimensions])),
sample_count: Arc::new(RwLock::new(0)),
}
}
pub async fn detect(&self, values: &[f64]) -> Result<Vec<Option<Anomaly>>, StreamError> {
if values.len() != self.detectors.len() {
return Err(StreamError::InvalidInput(format!(
"Expected {} dimensions, got {}",
self.detectors.len(),
values.len()
)));
}
let mut mean = self.mean_vector.write().await;
let mut count = self.sample_count.write().await;
*count += 1;
for (i, &v) in values.iter().enumerate() {
let delta = v - mean[i];
mean[i] += delta / *count as f64;
}
drop(mean);
drop(count);
let mut results = Vec::with_capacity(values.len());
for (i, (&value, detector)) in values.iter().zip(&self.detectors).enumerate() {
let mut anomaly = detector.detect(value).await?;
if let Some(ref mut anom) = anomaly {
anom.feature_index = Some(i);
}
results.push(anomaly);
}
Ok(results)
}
pub async fn mahalanobis_score(&self, values: &[f64]) -> f64 {
let mean = self.mean_vector.read().await;
if values.len() != mean.len() {
return 0.0;
}
let mut score = 0.0;
for (i, &v) in values.iter().enumerate() {
let diff = v - mean[i];
if let Ok(stats) = self.get_dimension_stats(i).await {
let var = stats.current_std.powi(2).max(0.001);
score += diff * diff / var;
}
}
score.sqrt()
}
pub async fn get_dimension_stats(&self, dimension: usize) -> Result<AnomalyStats, StreamError> {
if dimension >= self.detectors.len() {
return Err(StreamError::InvalidInput(format!(
"Dimension {} out of range",
dimension
)));
}
Ok(self.detectors[dimension].get_stats().await)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_zscore_detection() {
let config = AnomalyConfig {
warmup_samples: 10,
initial_threshold: 2.0,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..100 {
let value = 50.0 + (i as f64 % 10.0) - 5.0;
detector.detect(value).await.unwrap();
}
let result = detector.detect(1000.0).await.unwrap();
assert!(result.is_some());
let anomaly = result.unwrap();
assert!(anomaly.score > 2.0);
}
#[tokio::test]
async fn test_modified_zscore() {
let config = AnomalyConfig {
warmup_samples: 10,
use_ensemble: false,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..50 {
let value = 10.0 + (i % 5) as f64; detector.detect(value).await.unwrap();
}
let result = detector
.detect_with(100.0, DetectorType::ModifiedZScore)
.await
.unwrap();
assert!(result.is_some());
}
#[tokio::test]
async fn test_iqr_detection() {
let config = AnomalyConfig {
warmup_samples: 10,
use_ensemble: false,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..100 {
let value = 50.0 + (i % 20) as f64 - 10.0;
detector.detect(value).await.unwrap();
}
let result = detector
.detect_with(200.0, DetectorType::IQR)
.await
.unwrap();
assert!(result.is_some());
}
#[tokio::test]
async fn test_ewma_detection() {
let config = AnomalyConfig {
warmup_samples: 10,
use_ensemble: true, ..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..100 {
let value = 50.0 + ((i as f64).sin() * 5.0);
detector.detect(value).await.unwrap();
}
let result = detector.detect(200.0).await.unwrap();
assert!(
result.is_some(),
"Ensemble (including EWMA) should detect extreme outlier"
);
}
#[tokio::test]
async fn test_ensemble_detection() {
let config = AnomalyConfig {
warmup_samples: 20,
use_ensemble: true,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..100 {
let value = 50.0 + (i as f64).sin() * 5.0;
detector.detect(value).await.unwrap();
}
let result = detector.detect(500.0).await.unwrap();
assert!(result.is_some());
if let Some(anomaly) = result {
assert!(anomaly.context.contains_key("votes"));
}
}
#[tokio::test]
async fn test_severity_levels() {
let config = AnomalyConfig {
warmup_samples: 10,
initial_threshold: 2.0,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..50 {
let value = 100.0 + (i % 10) as f64; detector.detect(value).await.unwrap();
}
let result = detector.detect(115.0).await.unwrap();
if let Some(anomaly) = result {
assert!(anomaly.severity <= AnomalySeverity::Medium);
}
let result = detector.detect(1000.0).await.unwrap();
assert!(result.is_some());
assert!(result.unwrap().severity >= AnomalySeverity::High);
}
#[tokio::test]
async fn test_adaptive_threshold() {
let config = AnomalyConfig {
warmup_samples: 10,
adaptation_rate: 0.2,
use_ensemble: false,
..Default::default()
};
let detector = AnomalyDetector::new(config);
let initial_threshold = detector.get_threshold().await;
assert_eq!(initial_threshold, 3.0);
detector.set_threshold(4.0).await;
let new_threshold = detector.get_threshold().await;
assert_eq!(new_threshold, 4.0);
for i in 0..100 {
let value = 50.0 + (i % 20) as f64;
detector.detect(value).await.unwrap();
}
let stats = detector.get_stats().await;
assert_eq!(stats.total_samples, 100);
assert!(stats.current_mean > 0.0);
let result = detector.detect(300.0).await.unwrap();
let _is_anomaly = result.is_some();
}
#[tokio::test]
async fn test_statistics() {
let config = AnomalyConfig {
warmup_samples: 10,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for i in 0..100 {
detector.detect(i as f64).await.unwrap();
}
let stats = detector.get_stats().await;
assert_eq!(stats.total_samples, 100);
assert!(stats.current_mean > 0.0);
}
#[tokio::test]
async fn test_reset() {
let config = AnomalyConfig::default();
let detector = AnomalyDetector::new(config);
for i in 0..100 {
detector.detect(i as f64).await.unwrap();
}
detector.reset().await;
let stats = detector.get_stats().await;
assert_eq!(stats.total_samples, 0);
}
#[tokio::test]
async fn test_cusum_detection() {
let config = AnomalyConfig {
warmup_samples: 10,
use_ensemble: false,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for _ in 0..50 {
detector.detect(100.0).await.unwrap();
}
for _ in 0..10 {
let result = detector
.detect_with(200.0, DetectorType::CUSUM)
.await
.unwrap();
if result.is_some() {
return; }
}
}
#[tokio::test]
async fn test_batch_detection() {
let config = AnomalyConfig {
warmup_samples: 10,
..Default::default()
};
let detector = AnomalyDetector::new(config);
for _ in 0..50 {
detector.detect(100.0).await.unwrap();
}
let values: Vec<f64> = vec![100.0, 101.0, 1000.0, 102.0, 999.0];
let results = detector.detect_batch(&values).await.unwrap();
assert_eq!(results.len(), 5);
let anomaly_count = results.iter().filter(|r| r.is_some()).count();
assert!(anomaly_count >= 1);
}
#[tokio::test]
async fn test_multi_dimensional() {
let config = AnomalyConfig {
warmup_samples: 10,
use_ensemble: false,
..Default::default()
};
let detector = MultiDimensionalDetector::new(3, config);
for _ in 0..50 {
detector.detect(&[10.0, 20.0, 30.0]).await.unwrap();
}
let results = detector.detect(&[1000.0, 20.0, 30.0]).await.unwrap();
assert!(results[0].is_some());
assert!(results[0].as_ref().unwrap().feature_index == Some(0));
}
#[tokio::test]
async fn test_mahalanobis_score() {
let config = AnomalyConfig {
warmup_samples: 10,
..Default::default()
};
let detector = MultiDimensionalDetector::new(2, config);
for _ in 0..100 {
detector.detect(&[10.0, 20.0]).await.unwrap();
}
let normal_score = detector.mahalanobis_score(&[10.0, 20.0]).await;
let anomaly_score = detector.mahalanobis_score(&[100.0, 200.0]).await;
assert!(anomaly_score > normal_score);
}
#[tokio::test]
async fn test_alert_generation() {
let config = AnomalyConfig {
warmup_samples: 10,
alert_cooldown: Duration::from_millis(10),
..Default::default()
};
let detector = AnomalyDetector::new(config);
for _ in 0..50 {
detector.detect(100.0).await.unwrap();
}
detector.detect(10000.0).await.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
let alerts = detector.get_alerts(10).await;
assert!(alerts.len() <= 1);
}
#[tokio::test]
async fn test_acknowledge_alert() {
let config = AnomalyConfig {
warmup_samples: 10,
alert_cooldown: Duration::from_millis(1),
..Default::default()
};
let detector = AnomalyDetector::new(config);
for _ in 0..50 {
detector.detect(100.0).await.unwrap();
}
detector.detect(10000.0).await.unwrap();
tokio::time::sleep(Duration::from_millis(5)).await;
let alerts = detector.get_alerts(10).await;
if !alerts.is_empty() {
detector
.acknowledge_alert(&alerts[0].alert_id)
.await
.unwrap();
let updated_alerts = detector.get_alerts(10).await;
assert!(updated_alerts[0].acknowledged);
}
}
}