term_guard/analyzers/anomaly/
detector.rs

1//! Anomaly detection framework for data quality monitoring.
2//!
3//! This module provides infrastructure for detecting anomalies in metrics over time
4//! using various statistical methods. It's designed to work with the analyzer framework
5//! to monitor data quality metrics and alert when significant deviations occur.
6//!
7//! ## Architecture
8//!
9//! The anomaly detection system consists of:
10//! - `AnomalyDetector`: Core trait for anomaly detection strategies
11//! - `MetricsRepository`: Storage abstraction for historical metrics
12//! - Detection strategies: RelativeRateOfChange, AbsoluteChange, Z-score
13//! - `AnomalyDetectionRunner`: Orchestrates detection across metrics
14//!
15//! ## Example
16//!
17//! ```rust,ignore
18//! use term_guard::analyzers::anomaly::{
19//!     AnomalyDetectionRunner, InMemoryMetricsRepository,
20//!     RelativeRateOfChangeDetector, ZScoreDetector
21//! };
22//! use term_guard::analyzers::AnalysisRunner;
23//! use datafusion::prelude::*;
24//!
25//! # tokio::runtime::Runtime::new().unwrap().block_on(async {
26//! // Create metrics repository
27//! let repository = InMemoryMetricsRepository::new();
28//!
29//! // Create detection runner with strategies
30//! let detector = AnomalyDetectionRunner::builder()
31//!     .repository(Box::new(repository))
32//!     .add_detector("completeness.*", Box::new(RelativeRateOfChangeDetector::new(0.1)))
33//!     .add_detector("size", Box::new(ZScoreDetector::new(3.0)))
34//!     .build();
35//!
36//! // Run analysis
37//! let ctx = SessionContext::new();
38//! let runner = AnalysisRunner::new();
39//! let metrics = runner.run(&ctx).await?;
40//!
41//! // Detect anomalies
42//! let anomalies = detector.detect_anomalies(&metrics).await?;
43//! for anomaly in anomalies {
44//!     println!("Anomaly detected in {}: {} (confidence: {:.2})",
45//!              anomaly.metric_name, anomaly.description, anomaly.confidence);
46//! }
47//! # Ok::<(), Box<dyn std::error::Error>>(())
48//! # });
49//! ```
50
51use std::collections::HashMap;
52use std::sync::Arc;
53
54use async_trait::async_trait;
55use chrono::{DateTime, Duration, Utc};
56use serde::{Deserialize, Serialize};
57use tracing::{debug, info, instrument, warn};
58
59use crate::analyzers::{AnalyzerContext, AnalyzerError, AnalyzerResult, MetricValue};
60
61/// Represents a detected anomaly in a metric.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct Anomaly {
64    /// The name of the metric where the anomaly was detected.
65    pub metric_name: String,
66
67    /// The current value of the metric.
68    pub current_value: MetricValue,
69
70    /// The expected value or range.
71    pub expected_value: Option<MetricValue>,
72
73    /// The detection strategy that identified this anomaly.
74    pub detection_strategy: String,
75
76    /// Confidence score of the anomaly (0.0 to 1.0).
77    pub confidence: f64,
78
79    /// Human-readable description of the anomaly.
80    pub description: String,
81
82    /// Timestamp when the anomaly was detected.
83    pub detected_at: DateTime<Utc>,
84
85    /// Additional context or metadata.
86    pub metadata: HashMap<String, String>,
87}
88
89impl Anomaly {
90    /// Creates a new anomaly with the given parameters.
91    pub fn new(
92        metric_name: String,
93        current_value: MetricValue,
94        detection_strategy: String,
95        confidence: f64,
96        description: String,
97    ) -> Self {
98        Self {
99            metric_name,
100            current_value,
101            expected_value: None,
102            detection_strategy,
103            confidence,
104            description,
105            detected_at: Utc::now(),
106            metadata: HashMap::new(),
107        }
108    }
109
110    /// Sets the expected value for this anomaly.
111    pub fn with_expected_value(mut self, value: MetricValue) -> Self {
112        self.expected_value = Some(value);
113        self
114    }
115
116    /// Adds metadata to this anomaly.
117    pub fn with_metadata(mut self, key: String, value: String) -> Self {
118        self.metadata.insert(key, value);
119        self
120    }
121}
122
123/// Historical data point for a metric.
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct MetricDataPoint {
126    /// The metric value.
127    pub value: MetricValue,
128
129    /// When this value was recorded.
130    pub timestamp: DateTime<Utc>,
131
132    /// Optional metadata about the data point.
133    pub metadata: HashMap<String, String>,
134}
135
136/// Trait for anomaly detection strategies.
137#[async_trait]
138pub trait AnomalyDetector: Send + Sync {
139    /// Detects anomalies in the given metric.
140    ///
141    /// # Arguments
142    /// * `metric_name` - Name of the metric being analyzed
143    /// * `current_value` - The current value to check
144    /// * `history` - Historical values for comparison
145    ///
146    /// # Returns
147    /// An optional anomaly if one is detected
148    async fn detect(
149        &self,
150        metric_name: &str,
151        current_value: &MetricValue,
152        history: &[MetricDataPoint],
153    ) -> AnalyzerResult<Option<Anomaly>>;
154
155    /// Returns the name of this detection strategy.
156    fn name(&self) -> &str;
157
158    /// Returns a description of this detection strategy.
159    fn description(&self) -> &str;
160}
161
162/// Trait for storing and retrieving metric history.
163#[async_trait]
164pub trait MetricsRepository: Send + Sync {
165    /// Stores a metric value.
166    async fn store_metric(
167        &self,
168        metric_name: &str,
169        value: MetricValue,
170        timestamp: DateTime<Utc>,
171    ) -> AnalyzerResult<()>;
172
173    /// Retrieves historical values for a metric.
174    ///
175    /// # Arguments
176    /// * `metric_name` - Name of the metric
177    /// * `since` - Optional start time for the history
178    /// * `until` - Optional end time for the history
179    /// * `limit` - Maximum number of data points to return
180    async fn get_metric_history(
181        &self,
182        metric_name: &str,
183        since: Option<DateTime<Utc>>,
184        until: Option<DateTime<Utc>>,
185        limit: Option<usize>,
186    ) -> AnalyzerResult<Vec<MetricDataPoint>>;
187
188    /// Stores metrics from an analyzer context.
189    async fn store_context(&self, context: &AnalyzerContext) -> AnalyzerResult<()> {
190        let timestamp = Utc::now();
191        for (metric_name, value) in context.all_metrics() {
192            self.store_metric(metric_name, value.clone(), timestamp)
193                .await?;
194        }
195        Ok(())
196    }
197}
198
199/// Configuration for in-memory metrics repository.
200#[derive(Debug, Clone)]
201pub struct InMemoryMetricsConfig {
202    /// Maximum number of data points per metric (default: 10,000).
203    pub max_points_per_metric: usize,
204    /// Maximum total number of metrics (default: 1,000).
205    pub max_metrics: usize,
206    /// Maximum age of data points in seconds (default: 30 days).
207    pub max_age_seconds: i64,
208}
209
210impl Default for InMemoryMetricsConfig {
211    fn default() -> Self {
212        Self {
213            max_points_per_metric: 10_000,
214            max_metrics: 1_000,
215            max_age_seconds: 30 * 24 * 60 * 60, // 30 days
216        }
217    }
218}
219
220/// In-memory implementation of MetricsRepository for testing and development.
221///
222/// **Security Note**: This implementation includes memory limits to prevent OOM attacks
223/// when used in production scenarios. Configure limits appropriately for your use case.
224#[derive(Clone)]
225pub struct InMemoryMetricsRepository {
226    data: Arc<tokio::sync::RwLock<HashMap<String, Vec<MetricDataPoint>>>>,
227    config: InMemoryMetricsConfig,
228}
229
230impl InMemoryMetricsRepository {
231    /// Creates a new in-memory metrics repository with default limits.
232    pub fn new() -> Self {
233        Self::with_config(InMemoryMetricsConfig::default())
234    }
235
236    /// Creates a new in-memory metrics repository with custom configuration.
237    pub fn with_config(config: InMemoryMetricsConfig) -> Self {
238        Self {
239            data: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
240            config,
241        }
242    }
243
244    /// Returns the current memory usage statistics.
245    pub async fn memory_stats(&self) -> MemoryStats {
246        let data = self.data.read().await;
247        let mut total_points = 0;
248        let mut oldest_timestamp = None;
249        let mut newest_timestamp = None;
250
251        for history in data.values() {
252            total_points += history.len();
253            for point in history {
254                match oldest_timestamp {
255                    None => oldest_timestamp = Some(point.timestamp),
256                    Some(oldest) if point.timestamp < oldest => {
257                        oldest_timestamp = Some(point.timestamp)
258                    }
259                    _ => {}
260                }
261                match newest_timestamp {
262                    None => newest_timestamp = Some(point.timestamp),
263                    Some(newest) if point.timestamp > newest => {
264                        newest_timestamp = Some(point.timestamp)
265                    }
266                    _ => {}
267                }
268            }
269        }
270
271        MemoryStats {
272            total_metrics: data.len(),
273            total_data_points: total_points,
274            oldest_data_point: oldest_timestamp,
275            newest_data_point: newest_timestamp,
276            estimated_memory_bytes: Self::estimate_memory_usage(&data),
277        }
278    }
279
280    /// Estimates memory usage in bytes.
281    fn estimate_memory_usage(data: &HashMap<String, Vec<MetricDataPoint>>) -> usize {
282        let mut size = std::mem::size_of::<HashMap<String, Vec<MetricDataPoint>>>();
283
284        for (key, values) in data {
285            size += std::mem::size_of::<String>() + key.len();
286            size += std::mem::size_of::<Vec<MetricDataPoint>>();
287            size += values.len() * std::mem::size_of::<MetricDataPoint>();
288
289            // Estimate metadata size
290            for point in values {
291                for (k, v) in &point.metadata {
292                    size += std::mem::size_of::<String>() * 2 + k.len() + v.len();
293                }
294            }
295        }
296
297        size
298    }
299
300    /// Performs cleanup of old data points based on configuration limits.
301    async fn cleanup_if_needed(&self) {
302        let mut data = self.data.write().await;
303
304        // Remove metrics if we exceed the limit
305        if data.len() > self.config.max_metrics {
306            warn!(
307                current_metrics = data.len(),
308                max_metrics = self.config.max_metrics,
309                "Metrics limit exceeded, removing oldest metrics"
310            );
311
312            // Keep the most recently updated metrics
313            let mut metrics_by_latest: Vec<_> = data
314                .iter()
315                .map(|(name, points)| {
316                    let latest = points.iter().map(|p| p.timestamp).max().unwrap_or_default();
317                    (name.clone(), latest)
318                })
319                .collect();
320
321            metrics_by_latest.sort_by(|a, b| b.1.cmp(&a.1)); // Sort by latest timestamp desc
322
323            // Remove oldest metrics
324            let to_remove = metrics_by_latest.len() - self.config.max_metrics;
325            for (metric_name, _) in metrics_by_latest.iter().skip(self.config.max_metrics) {
326                data.remove(metric_name);
327            }
328
329            info!(
330                removed_metrics = to_remove,
331                remaining_metrics = data.len(),
332                "Cleaned up old metrics"
333            );
334        }
335
336        // Clean up old data points and enforce per-metric limits
337        let cutoff_time = Utc::now() - Duration::seconds(self.config.max_age_seconds);
338        let mut total_points_removed = 0;
339
340        for (metric_name, points) in data.iter_mut() {
341            let original_len = points.len();
342
343            // Remove points older than cutoff
344            points.retain(|p| p.timestamp >= cutoff_time);
345
346            // Limit points per metric (keep most recent)
347            if points.len() > self.config.max_points_per_metric {
348                points.sort_by_key(|p| p.timestamp);
349                let to_keep = points.len() - self.config.max_points_per_metric;
350                points.drain(0..to_keep);
351            }
352
353            let removed = original_len - points.len();
354            if removed > 0 {
355                total_points_removed += removed;
356                debug!(
357                    metric = metric_name,
358                    removed_points = removed,
359                    remaining_points = points.len(),
360                    "Cleaned up old data points"
361                );
362            }
363        }
364
365        if total_points_removed > 0 {
366            info!(
367                total_removed = total_points_removed,
368                "Completed data point cleanup"
369            );
370        }
371    }
372}
373
374/// Memory usage statistics for the in-memory repository.
375#[derive(Debug, Clone, Serialize, Deserialize)]
376pub struct MemoryStats {
377    /// Total number of metrics stored.
378    pub total_metrics: usize,
379    /// Total number of data points across all metrics.
380    pub total_data_points: usize,
381    /// Timestamp of the oldest data point.
382    pub oldest_data_point: Option<DateTime<Utc>>,
383    /// Timestamp of the newest data point.
384    pub newest_data_point: Option<DateTime<Utc>>,
385    /// Estimated memory usage in bytes.
386    pub estimated_memory_bytes: usize,
387}
388
389impl Default for InMemoryMetricsRepository {
390    fn default() -> Self {
391        Self::new()
392    }
393}
394
395#[async_trait]
396impl MetricsRepository for InMemoryMetricsRepository {
397    async fn store_metric(
398        &self,
399        metric_name: &str,
400        value: MetricValue,
401        timestamp: DateTime<Utc>,
402    ) -> AnalyzerResult<()> {
403        // Check limits before storing
404        {
405            let data = self.data.read().await;
406            if data.len() >= self.config.max_metrics && !data.contains_key(metric_name) {
407                return Err(AnalyzerError::Custom(format!(
408                    "Maximum metrics limit ({}) exceeded",
409                    self.config.max_metrics
410                )));
411            }
412
413            if let Some(existing) = data.get(metric_name) {
414                if existing.len() >= self.config.max_points_per_metric {
415                    // Will be handled by cleanup, but log a warning
416                    warn!(
417                        metric = metric_name,
418                        current_points = existing.len(),
419                        max_points = self.config.max_points_per_metric,
420                        "Metric approaching memory limit"
421                    );
422                }
423            }
424        }
425
426        // Store the new data point
427        {
428            let mut data = self.data.write().await;
429            let entry = data.entry(metric_name.to_string()).or_insert_with(Vec::new);
430            entry.push(MetricDataPoint {
431                value,
432                timestamp,
433                metadata: HashMap::new(),
434            });
435            // Keep entries sorted by timestamp
436            entry.sort_by_key(|dp| dp.timestamp);
437        }
438
439        // Perform cleanup if needed (async to avoid holding lock too long)
440        self.cleanup_if_needed().await;
441
442        Ok(())
443    }
444
445    async fn get_metric_history(
446        &self,
447        metric_name: &str,
448        since: Option<DateTime<Utc>>,
449        until: Option<DateTime<Utc>>,
450        limit: Option<usize>,
451    ) -> AnalyzerResult<Vec<MetricDataPoint>> {
452        let data = self.data.read().await;
453
454        if let Some(history) = data.get(metric_name) {
455            let mut filtered: Vec<_> = history
456                .iter()
457                .filter(|dp| {
458                    let after_since = since.map_or(true, |s| dp.timestamp >= s);
459                    let before_until = until.map_or(true, |u| dp.timestamp <= u);
460                    after_since && before_until
461                })
462                .cloned()
463                .collect();
464
465            // Apply limit if specified
466            if let Some(limit) = limit {
467                filtered.truncate(limit);
468            }
469
470            Ok(filtered)
471        } else {
472            Ok(Vec::new())
473        }
474    }
475}
476
477/// Detects anomalies based on relative rate of change.
478pub struct RelativeRateOfChangeDetector {
479    /// Maximum allowed rate of change (e.g., 0.1 for 10%).
480    pub max_rate_of_change: f64,
481
482    /// Minimum history size required for detection.
483    pub min_history_size: usize,
484}
485
486impl RelativeRateOfChangeDetector {
487    /// Creates a new relative rate of change detector.
488    ///
489    /// # Arguments
490    /// * `max_rate_of_change` - Maximum allowed relative change (e.g., 0.1 for 10%)
491    pub fn new(max_rate_of_change: f64) -> Self {
492        Self {
493            max_rate_of_change,
494            min_history_size: 2,
495        }
496    }
497
498    /// Sets the minimum history size required.
499    pub fn with_min_history_size(mut self, size: usize) -> Self {
500        self.min_history_size = size;
501        self
502    }
503}
504
505#[async_trait]
506impl AnomalyDetector for RelativeRateOfChangeDetector {
507    async fn detect(
508        &self,
509        metric_name: &str,
510        current_value: &MetricValue,
511        history: &[MetricDataPoint],
512    ) -> AnalyzerResult<Option<Anomaly>> {
513        if history.len() < self.min_history_size {
514            debug!(
515                metric = metric_name,
516                history_size = history.len(),
517                required = self.min_history_size,
518                "Insufficient history for rate of change detection"
519            );
520            return Ok(None);
521        }
522
523        // Get the most recent historical value
524        let previous = history.last().unwrap();
525
526        debug!(
527            metric = metric_name,
528            current = ?current_value,
529            previous = ?previous.value,
530            "Comparing values for rate of change"
531        );
532
533        // Calculate rate of change for numeric metrics
534        match (current_value, &previous.value) {
535            (MetricValue::Long(current), MetricValue::Long(previous)) => {
536                if *previous == 0 {
537                    return Ok(None); // Can't calculate rate of change from zero
538                }
539
540                let rate_of_change = ((*current - *previous) as f64).abs() / (*previous as f64);
541
542                debug!(
543                    metric = metric_name,
544                    rate_of_change = rate_of_change,
545                    threshold = self.max_rate_of_change,
546                    "Calculated rate of change"
547                );
548
549                if rate_of_change > self.max_rate_of_change {
550                    let anomaly = Anomaly::new(
551                        metric_name.to_string(),
552                        current_value.clone(),
553                        self.name().to_string(),
554                        rate_of_change / self.max_rate_of_change, // Confidence based on severity
555                        format!(
556                            "Relative change of {:.1}% exceeds threshold of {:.1}%",
557                            rate_of_change * 100.0,
558                            self.max_rate_of_change * 100.0
559                        ),
560                    )
561                    .with_expected_value(MetricValue::Long(*previous))
562                    .with_metadata("rate_of_change".to_string(), format!("{rate_of_change:.4}"));
563
564                    return Ok(Some(anomaly));
565                }
566            }
567            (MetricValue::Double(current), MetricValue::Double(previous)) => {
568                if *previous == 0.0 {
569                    return Ok(None); // Can't calculate rate of change from zero
570                }
571
572                let rate_of_change = ((current - previous).abs()) / previous.abs();
573
574                if rate_of_change > self.max_rate_of_change {
575                    let anomaly = Anomaly::new(
576                        metric_name.to_string(),
577                        current_value.clone(),
578                        self.name().to_string(),
579                        rate_of_change / self.max_rate_of_change, // Confidence based on severity
580                        format!(
581                            "Relative change of {:.1}% exceeds threshold of {:.1}%",
582                            rate_of_change * 100.0,
583                            self.max_rate_of_change * 100.0
584                        ),
585                    )
586                    .with_expected_value(MetricValue::Double(*previous))
587                    .with_metadata("rate_of_change".to_string(), format!("{rate_of_change:.4}"));
588
589                    return Ok(Some(anomaly));
590                }
591            }
592            _ => {
593                // Non-numeric metrics or type mismatch
594                return Ok(None);
595            }
596        }
597
598        Ok(None)
599    }
600
601    fn name(&self) -> &str {
602        "RelativeRateOfChange"
603    }
604
605    fn description(&self) -> &str {
606        "Detects anomalies when the relative rate of change exceeds a threshold"
607    }
608}
609
610/// Detects anomalies based on absolute change thresholds.
611pub struct AbsoluteChangeDetector {
612    /// Maximum allowed absolute change.
613    pub max_absolute_change: f64,
614
615    /// Minimum history size required for detection.
616    pub min_history_size: usize,
617}
618
619impl AbsoluteChangeDetector {
620    /// Creates a new absolute change detector.
621    ///
622    /// # Arguments
623    /// * `max_absolute_change` - Maximum allowed absolute change
624    pub fn new(max_absolute_change: f64) -> Self {
625        Self {
626            max_absolute_change,
627            min_history_size: 1,
628        }
629    }
630
631    /// Sets the minimum history size required.
632    pub fn with_min_history_size(mut self, size: usize) -> Self {
633        self.min_history_size = size;
634        self
635    }
636}
637
638#[async_trait]
639impl AnomalyDetector for AbsoluteChangeDetector {
640    async fn detect(
641        &self,
642        metric_name: &str,
643        current_value: &MetricValue,
644        history: &[MetricDataPoint],
645    ) -> AnalyzerResult<Option<Anomaly>> {
646        if history.len() < self.min_history_size {
647            return Ok(None);
648        }
649
650        let previous = history.last().unwrap();
651
652        match (current_value, &previous.value) {
653            (MetricValue::Long(current), MetricValue::Long(previous)) => {
654                let change = (*current - *previous).abs() as f64;
655
656                if change > self.max_absolute_change {
657                    let anomaly = Anomaly::new(
658                        metric_name.to_string(),
659                        current_value.clone(),
660                        self.name().to_string(),
661                        change / self.max_absolute_change,
662                        format!(
663                            "Absolute change of {change} exceeds threshold of {}",
664                            self.max_absolute_change
665                        ),
666                    )
667                    .with_expected_value(MetricValue::Long(*previous))
668                    .with_metadata("absolute_change".to_string(), format!("{change}"));
669
670                    return Ok(Some(anomaly));
671                }
672            }
673            (MetricValue::Double(current), MetricValue::Double(previous)) => {
674                let change = (current - previous).abs();
675
676                if change > self.max_absolute_change {
677                    let anomaly = Anomaly::new(
678                        metric_name.to_string(),
679                        current_value.clone(),
680                        self.name().to_string(),
681                        change / self.max_absolute_change,
682                        format!(
683                            "Absolute change of {change:.4} exceeds threshold of {:.4}",
684                            self.max_absolute_change
685                        ),
686                    )
687                    .with_expected_value(MetricValue::Double(*previous))
688                    .with_metadata("absolute_change".to_string(), format!("{change:.4}"));
689
690                    return Ok(Some(anomaly));
691                }
692            }
693            _ => return Ok(None),
694        }
695
696        Ok(None)
697    }
698
699    fn name(&self) -> &str {
700        "AbsoluteChange"
701    }
702
703    fn description(&self) -> &str {
704        "Detects anomalies when the absolute change exceeds a threshold"
705    }
706}
707
708/// Detects anomalies using Z-score (standard deviations from mean).
709pub struct ZScoreDetector {
710    /// Z-score threshold for anomaly detection (e.g., 3.0 for 3 standard deviations).
711    pub z_score_threshold: f64,
712
713    /// Minimum history size required for meaningful statistics.
714    pub min_history_size: usize,
715}
716
717impl ZScoreDetector {
718    /// Creates a new Z-score detector.
719    ///
720    /// # Arguments
721    /// * `z_score_threshold` - Number of standard deviations for anomaly threshold
722    pub fn new(z_score_threshold: f64) -> Self {
723        Self {
724            z_score_threshold,
725            min_history_size: 10,
726        }
727    }
728
729    /// Sets the minimum history size required.
730    pub fn with_min_history_size(mut self, size: usize) -> Self {
731        self.min_history_size = size;
732        self
733    }
734}
735
736#[async_trait]
737impl AnomalyDetector for ZScoreDetector {
738    async fn detect(
739        &self,
740        metric_name: &str,
741        current_value: &MetricValue,
742        history: &[MetricDataPoint],
743    ) -> AnalyzerResult<Option<Anomaly>> {
744        if history.len() < self.min_history_size {
745            return Ok(None);
746        }
747
748        // Extract numeric values from history
749        let numeric_values: Vec<f64> = history
750            .iter()
751            .filter_map(|dp| match &dp.value {
752                MetricValue::Long(v) => Some(*v as f64),
753                MetricValue::Double(v) => Some(*v),
754                _ => None,
755            })
756            .collect();
757
758        if numeric_values.len() < self.min_history_size {
759            return Ok(None);
760        }
761
762        // Calculate mean and standard deviation
763        let mean = numeric_values.iter().sum::<f64>() / numeric_values.len() as f64;
764        let variance = numeric_values
765            .iter()
766            .map(|v| (v - mean).powi(2))
767            .sum::<f64>()
768            / numeric_values.len() as f64;
769        let std_dev = variance.sqrt();
770
771        // Can't calculate Z-score if standard deviation is zero
772        if std_dev == 0.0 {
773            return Ok(None);
774        }
775
776        // Calculate Z-score for current value
777        let current_numeric = match current_value {
778            MetricValue::Long(v) => *v as f64,
779            MetricValue::Double(v) => *v,
780            _ => return Ok(None),
781        };
782
783        let z_score = (current_numeric - mean).abs() / std_dev;
784
785        if z_score > self.z_score_threshold {
786            let anomaly = Anomaly::new(
787                metric_name.to_string(),
788                current_value.clone(),
789                self.name().to_string(),
790                (z_score / self.z_score_threshold).min(1.0),
791                format!(
792                    "Value is {z_score:.1} standard deviations from mean (threshold: {:.1})",
793                    self.z_score_threshold
794                ),
795            )
796            .with_expected_value(MetricValue::Double(mean))
797            .with_metadata("z_score".to_string(), format!("{z_score:.2}"))
798            .with_metadata("mean".to_string(), format!("{mean:.4}"))
799            .with_metadata("std_dev".to_string(), format!("{std_dev:.4}"));
800
801            return Ok(Some(anomaly));
802        }
803
804        Ok(None)
805    }
806
807    fn name(&self) -> &str {
808        "ZScore"
809    }
810
811    fn description(&self) -> &str {
812        "Detects anomalies using statistical Z-score analysis"
813    }
814}
815
816/// Configuration for anomaly detection.
817#[derive(Debug, Clone)]
818pub struct AnomalyDetectionConfig {
819    /// Minimum confidence threshold for reporting anomalies.
820    pub min_confidence: f64,
821
822    /// Whether to store current metrics in the repository.
823    pub store_current_metrics: bool,
824
825    /// Default time window for historical data retrieval.
826    pub default_history_window: Duration,
827}
828
829impl Default for AnomalyDetectionConfig {
830    fn default() -> Self {
831        Self {
832            min_confidence: 0.7,
833            store_current_metrics: true,
834            default_history_window: Duration::days(30),
835        }
836    }
837}
838
839/// Orchestrates anomaly detection across metrics.
840pub struct AnomalyDetectionRunner {
841    repository: Box<dyn MetricsRepository>,
842    detectors: Vec<(String, Box<dyn AnomalyDetector>)>,
843    config: AnomalyDetectionConfig,
844}
845
846impl AnomalyDetectionRunner {
847    /// Creates a new builder for the anomaly detection runner.
848    pub fn builder() -> AnomalyDetectionRunnerBuilder {
849        AnomalyDetectionRunnerBuilder::default()
850    }
851
852    /// Detects anomalies in the given metrics.
853    #[instrument(skip(self, context))]
854    pub async fn detect_anomalies(
855        &self,
856        context: &AnalyzerContext,
857    ) -> AnalyzerResult<Vec<Anomaly>> {
858        let mut anomalies = Vec::new();
859
860        // Remember the current time BEFORE storing metrics
861        // Subtract 1ms to ensure current metrics stored after this time are excluded
862        let detection_time = Utc::now() - chrono::Duration::milliseconds(1);
863
864        // Store current metrics if configured
865        if self.config.store_current_metrics {
866            self.repository.store_context(context).await?;
867        }
868
869        // Check each metric against configured detectors
870        for (metric_name, metric_value) in context.all_metrics() {
871            for (pattern, detector) in &self.detectors {
872                // Check if metric name matches pattern
873                if self.matches_pattern(metric_name, pattern) {
874                    // Get historical data, excluding the current timestamp
875                    let since = Utc::now() - self.config.default_history_window;
876                    let history = self
877                        .repository
878                        .get_metric_history(metric_name, Some(since), Some(detection_time), None)
879                        .await?;
880
881                    debug!(
882                        metric = metric_name,
883                        history_size = history.len(),
884                        current_value = ?metric_value,
885                        "Running anomaly detection"
886                    );
887
888                    // Run detection
889                    match detector.detect(metric_name, metric_value, &history).await {
890                        Ok(Some(anomaly)) => {
891                            if anomaly.confidence >= self.config.min_confidence {
892                                info!(
893                                    metric = metric_name,
894                                    strategy = anomaly.detection_strategy,
895                                    confidence = anomaly.confidence,
896                                    "Anomaly detected"
897                                );
898                                anomalies.push(anomaly);
899                            }
900                        }
901                        Ok(None) => {
902                            // No anomaly detected
903                        }
904                        Err(e) => {
905                            warn!(
906                                metric = metric_name,
907                                detector = detector.name(),
908                                error = %e,
909                                "Error during anomaly detection"
910                            );
911                        }
912                    }
913                }
914            }
915        }
916
917        Ok(anomalies)
918    }
919
920    /// Checks if a metric name matches a pattern.
921    fn matches_pattern(&self, metric_name: &str, pattern: &str) -> bool {
922        if pattern == "*" {
923            return true;
924        }
925
926        if let Some(prefix) = pattern.strip_suffix('*') {
927            return metric_name.starts_with(prefix);
928        }
929
930        metric_name == pattern
931    }
932}
933
934/// Builder for AnomalyDetectionRunner.
935#[derive(Default)]
936pub struct AnomalyDetectionRunnerBuilder {
937    repository: Option<Box<dyn MetricsRepository>>,
938    detectors: Vec<(String, Box<dyn AnomalyDetector>)>,
939    config: AnomalyDetectionConfig,
940}
941
942impl AnomalyDetectionRunnerBuilder {
943    /// Sets the metrics repository.
944    pub fn repository(mut self, repository: Box<dyn MetricsRepository>) -> Self {
945        self.repository = Some(repository);
946        self
947    }
948
949    /// Adds a detector for metrics matching the given pattern.
950    ///
951    /// # Arguments
952    /// * `pattern` - Metric name pattern (supports * wildcard at end)
953    /// * `detector` - The anomaly detector to use
954    pub fn add_detector(mut self, pattern: &str, detector: Box<dyn AnomalyDetector>) -> Self {
955        self.detectors.push((pattern.to_string(), detector));
956        self
957    }
958
959    /// Sets the configuration.
960    pub fn config(mut self, config: AnomalyDetectionConfig) -> Self {
961        self.config = config;
962        self
963    }
964
965    /// Builds the AnomalyDetectionRunner.
966    pub fn build(self) -> AnalyzerResult<AnomalyDetectionRunner> {
967        let repository = self
968            .repository
969            .ok_or_else(|| AnalyzerError::Custom("Metrics repository is required".to_string()))?;
970
971        Ok(AnomalyDetectionRunner {
972            repository,
973            detectors: self.detectors,
974            config: self.config,
975        })
976    }
977}
978
979#[cfg(test)]
980mod tests {
981    use super::*;
982
983    #[tokio::test]
984    async fn test_relative_rate_of_change_detector() {
985        let detector = RelativeRateOfChangeDetector::new(0.1).with_min_history_size(1); // 10% threshold
986
987        // Create history
988        let history = vec![MetricDataPoint {
989            value: MetricValue::Long(100),
990            timestamp: Utc::now() - Duration::hours(1),
991            metadata: HashMap::new(),
992        }];
993
994        // Test normal change (5%)
995        let current = MetricValue::Long(105);
996        let result = detector
997            .detect("test_metric", &current, &history)
998            .await
999            .unwrap();
1000        assert!(result.is_none());
1001
1002        // Test anomalous change (20%)
1003        let current = MetricValue::Long(120);
1004        let result = detector
1005            .detect("test_metric", &current, &history)
1006            .await
1007            .unwrap();
1008        assert!(result.is_some());
1009        let anomaly = result.unwrap();
1010        assert_eq!(anomaly.detection_strategy, "RelativeRateOfChange");
1011        assert!(anomaly.confidence > 1.0); // Should be ~2.0 (20% / 10%)
1012    }
1013
1014    #[tokio::test]
1015    async fn test_z_score_detector() {
1016        let detector = ZScoreDetector::new(2.0); // 2 standard deviations
1017
1018        // Create history with normal distribution around 100
1019        let mut history = Vec::new();
1020        for i in 0..20 {
1021            history.push(MetricDataPoint {
1022                value: MetricValue::Long(95 + (i % 10)),
1023                timestamp: Utc::now() - Duration::hours(i),
1024                metadata: HashMap::new(),
1025            });
1026        }
1027
1028        // Test value within normal range
1029        let current = MetricValue::Long(102);
1030        let result = detector
1031            .detect("test_metric", &current, &history)
1032            .await
1033            .unwrap();
1034        assert!(result.is_none());
1035
1036        // Test outlier value
1037        let current = MetricValue::Long(150);
1038        let result = detector
1039            .detect("test_metric", &current, &history)
1040            .await
1041            .unwrap();
1042        assert!(result.is_some());
1043        let anomaly = result.unwrap();
1044        assert_eq!(anomaly.detection_strategy, "ZScore");
1045    }
1046
1047    #[tokio::test]
1048    async fn test_in_memory_repository_memory_limits() {
1049        // Test with small limits
1050        let config = InMemoryMetricsConfig {
1051            max_metrics: 2,
1052            max_points_per_metric: 3,
1053            max_age_seconds: 60,
1054        };
1055        let repo = InMemoryMetricsRepository::with_config(config);
1056
1057        let now = Utc::now();
1058
1059        // Add metrics up to the limit
1060        repo.store_metric("metric1", MetricValue::Long(100), now)
1061            .await
1062            .unwrap();
1063        repo.store_metric("metric2", MetricValue::Long(200), now)
1064            .await
1065            .unwrap();
1066
1067        // Third metric should fail
1068        let result = repo
1069            .store_metric("metric3", MetricValue::Long(300), now)
1070            .await;
1071        assert!(result.is_err());
1072
1073        // Check memory stats
1074        let stats = repo.memory_stats().await;
1075        assert_eq!(stats.total_metrics, 2);
1076        assert_eq!(stats.total_data_points, 2);
1077
1078        // Add more points to existing metrics (should work)
1079        repo.store_metric(
1080            "metric1",
1081            MetricValue::Long(101),
1082            now + Duration::seconds(1),
1083        )
1084        .await
1085        .unwrap();
1086        repo.store_metric(
1087            "metric1",
1088            MetricValue::Long(102),
1089            now + Duration::seconds(2),
1090        )
1091        .await
1092        .unwrap();
1093        repo.store_metric(
1094            "metric1",
1095            MetricValue::Long(103),
1096            now + Duration::seconds(3),
1097        )
1098        .await
1099        .unwrap();
1100
1101        // Check that cleanup has happened (should only keep 3 points per metric)
1102        let history = repo
1103            .get_metric_history("metric1", None, None, None)
1104            .await
1105            .unwrap();
1106        assert!(history.len() <= 3);
1107
1108        let final_stats = repo.memory_stats().await;
1109        assert!(final_stats.estimated_memory_bytes > 0);
1110    }
1111
1112    #[tokio::test]
1113    async fn test_in_memory_repository() {
1114        let repo = InMemoryMetricsRepository::new();
1115
1116        // Store some metrics
1117        let now = Utc::now();
1118        repo.store_metric("metric1", MetricValue::Long(100), now)
1119            .await
1120            .unwrap();
1121        repo.store_metric("metric1", MetricValue::Long(110), now + Duration::hours(1))
1122            .await
1123            .unwrap();
1124        repo.store_metric("metric2", MetricValue::Double(0.95), now)
1125            .await
1126            .unwrap();
1127
1128        // Retrieve history
1129        let history = repo
1130            .get_metric_history("metric1", None, None, None)
1131            .await
1132            .unwrap();
1133        assert_eq!(history.len(), 2);
1134        assert_eq!(history[0].value, MetricValue::Long(100));
1135        assert_eq!(history[1].value, MetricValue::Long(110));
1136
1137        // Test filtering by time
1138        let history = repo
1139            .get_metric_history("metric1", Some(now + Duration::minutes(30)), None, None)
1140            .await
1141            .unwrap();
1142        assert_eq!(history.len(), 1);
1143        assert_eq!(history[0].value, MetricValue::Long(110));
1144    }
1145}