Skip to main content

llm_optimizer_decision/
threshold_monitor.rs

1//! Threshold-Based Monitoring
2//!
3//! This module provides threshold-based monitoring and alerting for metrics,
4//! integrating drift detection and anomaly detection.
5
6use chrono::{DateTime, Utc};
7use dashmap::DashMap;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use uuid::Uuid;
11
12use crate::{
13    anomaly_detection::{AnomalyResult, MADDetector, ZScoreDetector},
14    drift_detection::{DriftAlgorithm, DriftStatus, PageHinkley, ADWIN, CUSUM},
15    errors::{DecisionError, Result},
16};
17
18/// Alert severity level
19#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
20pub enum AlertSeverity {
21    /// Informational
22    Info,
23    /// Warning
24    Warning,
25    /// Critical
26    Critical,
27}
28
29/// Alert type
30#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31pub enum AlertType {
32    /// Threshold violation
33    ThresholdViolation,
34    /// Drift detected
35    Drift,
36    /// Anomaly detected
37    Anomaly,
38    /// Performance degradation
39    PerformanceDegradation,
40}
41
42/// Alert information
43#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct Alert {
45    /// Alert ID
46    pub id: Uuid,
47    /// Metric name
48    pub metric_name: String,
49    /// Alert type
50    pub alert_type: AlertType,
51    /// Severity
52    pub severity: AlertSeverity,
53    /// Message
54    pub message: String,
55    /// Metric value
56    pub value: f64,
57    /// Threshold
58    pub threshold: Option<f64>,
59    /// Timestamp
60    pub timestamp: DateTime<Utc>,
61    /// Additional context
62    pub context: String,
63}
64
65impl Alert {
66    /// Create new alert
67    pub fn new(
68        metric_name: impl Into<String>,
69        alert_type: AlertType,
70        severity: AlertSeverity,
71        message: impl Into<String>,
72        value: f64,
73        threshold: Option<f64>,
74    ) -> Self {
75        Self {
76            id: Uuid::new_v4(),
77            metric_name: metric_name.into(),
78            alert_type,
79            severity,
80            message: message.into(),
81            value,
82            threshold,
83            timestamp: Utc::now(),
84            context: String::new(),
85        }
86    }
87
88    /// Set context
89    pub fn with_context(mut self, context: impl Into<String>) -> Self {
90        self.context = context.into();
91        self
92    }
93}
94
95/// Metric threshold configuration
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ThresholdConfig {
98    /// Minimum acceptable value
99    pub min_value: Option<f64>,
100    /// Maximum acceptable value
101    pub max_value: Option<f64>,
102    /// Warning threshold (lower)
103    pub warning_min: Option<f64>,
104    /// Warning threshold (upper)
105    pub warning_max: Option<f64>,
106    /// Enable drift detection
107    pub enable_drift_detection: bool,
108    /// Drift detection algorithm
109    pub drift_algorithm: DriftAlgorithm,
110    /// Enable anomaly detection
111    pub enable_anomaly_detection: bool,
112    /// Anomaly threshold (z-score)
113    pub anomaly_threshold: f64,
114}
115
116impl Default for ThresholdConfig {
117    fn default() -> Self {
118        Self {
119            min_value: None,
120            max_value: None,
121            warning_min: None,
122            warning_max: None,
123            enable_drift_detection: true,
124            drift_algorithm: DriftAlgorithm::ADWIN,
125            enable_anomaly_detection: true,
126            anomaly_threshold: 3.0,
127        }
128    }
129}
130
131impl ThresholdConfig {
132    /// Create config for quality metrics (0.0 - 1.0)
133    pub fn quality() -> Self {
134        Self {
135            min_value: Some(0.5),
136            max_value: Some(1.0),
137            warning_min: Some(0.7),
138            warning_max: None,
139            enable_drift_detection: true,
140            drift_algorithm: DriftAlgorithm::ADWIN,
141            enable_anomaly_detection: true,
142            anomaly_threshold: 2.5,
143        }
144    }
145
146    /// Create config for cost metrics
147    pub fn cost(max_cost: f64) -> Self {
148        Self {
149            min_value: Some(0.0),
150            max_value: Some(max_cost),
151            warning_min: None,
152            warning_max: Some(max_cost * 0.8),
153            enable_drift_detection: true,
154            drift_algorithm: DriftAlgorithm::PageHinkley,
155            enable_anomaly_detection: true,
156            anomaly_threshold: 3.0,
157        }
158    }
159
160    /// Create config for latency metrics (milliseconds)
161    pub fn latency(max_latency: f64) -> Self {
162        Self {
163            min_value: Some(0.0),
164            max_value: Some(max_latency),
165            warning_min: None,
166            warning_max: Some(max_latency * 0.8),
167            enable_drift_detection: true,
168            drift_algorithm: DriftAlgorithm::CUSUM,
169            enable_anomaly_detection: true,
170            anomaly_threshold: 2.5,
171        }
172    }
173}
174
175/// Metric monitor state
176struct MetricMonitor {
177    /// Configuration
178    config: ThresholdConfig,
179    /// Drift detector (ADWIN)
180    adwin: Option<ADWIN>,
181    /// Drift detector (Page-Hinkley)
182    page_hinkley: Option<PageHinkley>,
183    /// Drift detector (CUSUM)
184    cusum: Option<CUSUM>,
185    /// Anomaly detector (Z-score)
186    zscore: Option<ZScoreDetector>,
187    /// Anomaly detector (MAD)
188    mad: Option<MADDetector>,
189    /// Recent alerts
190    recent_alerts: Vec<Alert>,
191    /// Sample count
192    sample_count: u64,
193}
194
195impl MetricMonitor {
196    /// Create new metric monitor
197    fn new(config: ThresholdConfig) -> Result<Self> {
198        let adwin = if config.enable_drift_detection
199            && config.drift_algorithm == DriftAlgorithm::ADWIN
200        {
201            Some(ADWIN::new(0.002, 100)?)
202        } else {
203            None
204        };
205
206        let page_hinkley = if config.enable_drift_detection
207            && config.drift_algorithm == DriftAlgorithm::PageHinkley
208        {
209            Some(PageHinkley::new(10.0, 0.005)?)
210        } else {
211            None
212        };
213
214        let cusum = if config.enable_drift_detection
215            && config.drift_algorithm == DriftAlgorithm::CUSUM
216        {
217            // Need to estimate target mean - use default for now
218            Some(CUSUM::new(5.0, 0.5, 0.1)?)
219        } else {
220            None
221        };
222
223        let zscore = if config.enable_anomaly_detection {
224            Some(ZScoreDetector::new(50, config.anomaly_threshold)?)
225        } else {
226            None
227        };
228
229        let mad = if config.enable_anomaly_detection {
230            Some(MADDetector::new(50, config.anomaly_threshold)?)
231        } else {
232            None
233        };
234
235        Ok(Self {
236            config,
237            adwin,
238            page_hinkley,
239            cusum,
240            zscore,
241            mad,
242            recent_alerts: Vec::new(),
243            sample_count: 0,
244        })
245    }
246
247    /// Check value and generate alerts
248    fn check_value(&mut self, metric_name: &str, value: f64) -> Vec<Alert> {
249        let mut alerts = Vec::new();
250        self.sample_count += 1;
251
252        // Check hard thresholds
253        if let Some(min) = self.config.min_value {
254            if value < min {
255                alerts.push(
256                    Alert::new(
257                        metric_name,
258                        AlertType::ThresholdViolation,
259                        AlertSeverity::Critical,
260                        format!("Value {} below minimum threshold {}", value, min),
261                        value,
262                        Some(min),
263                    )
264                    .with_context(format!("Sample count: {}", self.sample_count)),
265                );
266            }
267        }
268
269        if let Some(max) = self.config.max_value {
270            if value > max {
271                alerts.push(
272                    Alert::new(
273                        metric_name,
274                        AlertType::ThresholdViolation,
275                        AlertSeverity::Critical,
276                        format!("Value {} exceeds maximum threshold {}", value, max),
277                        value,
278                        Some(max),
279                    )
280                    .with_context(format!("Sample count: {}", self.sample_count)),
281                );
282            }
283        }
284
285        // Check warning thresholds
286        if let Some(warning_min) = self.config.warning_min {
287            if value < warning_min && !alerts.iter().any(|a| matches!(a.alert_type, AlertType::ThresholdViolation)) {
288                alerts.push(
289                    Alert::new(
290                        metric_name,
291                        AlertType::ThresholdViolation,
292                        AlertSeverity::Warning,
293                        format!("Value {} below warning threshold {}", value, warning_min),
294                        value,
295                        Some(warning_min),
296                    )
297                    .with_context(format!("Sample count: {}", self.sample_count)),
298                );
299            }
300        }
301
302        if let Some(warning_max) = self.config.warning_max {
303            if value > warning_max && !alerts.iter().any(|a| matches!(a.alert_type, AlertType::ThresholdViolation)) {
304                alerts.push(
305                    Alert::new(
306                        metric_name,
307                        AlertType::ThresholdViolation,
308                        AlertSeverity::Warning,
309                        format!("Value {} exceeds warning threshold {}", value, warning_max),
310                        value,
311                        Some(warning_max),
312                    )
313                    .with_context(format!("Sample count: {}", self.sample_count)),
314                );
315            }
316        }
317
318        // Check drift
319        if self.config.enable_drift_detection {
320            let drift_status = if let Some(adwin) = &mut self.adwin {
321                adwin.add(value)
322            } else if let Some(ph) = &mut self.page_hinkley {
323                ph.add(value)
324            } else if let Some(cusum) = &mut self.cusum {
325                cusum.add(value)
326            } else {
327                DriftStatus::Stable
328            };
329
330            match drift_status {
331                DriftStatus::Drift => {
332                    alerts.push(
333                        Alert::new(
334                            metric_name,
335                            AlertType::Drift,
336                            AlertSeverity::Critical,
337                            format!(
338                                "Drift detected using {:?}",
339                                self.config.drift_algorithm
340                            ),
341                            value,
342                            None,
343                        )
344                        .with_context(format!("Sample count: {}", self.sample_count)),
345                    );
346                }
347                DriftStatus::Warning => {
348                    alerts.push(
349                        Alert::new(
350                            metric_name,
351                            AlertType::Drift,
352                            AlertSeverity::Warning,
353                            "Possible drift detected".to_string(),
354                            value,
355                            None,
356                        )
357                        .with_context(format!("Sample count: {}", self.sample_count)),
358                    );
359                }
360                DriftStatus::Stable => {}
361            }
362        }
363
364        // Check anomalies
365        if self.config.enable_anomaly_detection {
366            let anomaly_result = if let Some(zscore) = &mut self.zscore {
367                zscore.add(value)
368            } else if let Some(mad) = &mut self.mad {
369                mad.add(value)
370            } else {
371                AnomalyResult::normal(0.0)
372            };
373
374            if anomaly_result.is_anomaly {
375                let severity = if anomaly_result.severity > 0.7 {
376                    AlertSeverity::Critical
377                } else {
378                    AlertSeverity::Warning
379                };
380
381                alerts.push(
382                    Alert::new(
383                        metric_name,
384                        AlertType::Anomaly,
385                        severity,
386                        format!(
387                            "Anomaly detected (score: {:.2}, severity: {:.2})",
388                            anomaly_result.score, anomaly_result.severity
389                        ),
390                        value,
391                        Some(self.config.anomaly_threshold),
392                    )
393                    .with_context(format!("Sample count: {}", self.sample_count)),
394                );
395            }
396        }
397
398        // Store recent alerts
399        for alert in &alerts {
400            self.recent_alerts.push(alert.clone());
401        }
402
403        // Keep only last 100 alerts
404        if self.recent_alerts.len() > 100 {
405            self.recent_alerts.drain(0..self.recent_alerts.len() - 100);
406        }
407
408        alerts
409    }
410}
411
412/// Threshold monitoring system
413pub struct ThresholdMonitoringSystem {
414    /// Monitors per metric
415    monitors: Arc<DashMap<String, MetricMonitor>>,
416    /// Alert handlers
417    alert_handlers: Arc<DashMap<String, Box<dyn Fn(&Alert) + Send + Sync>>>,
418}
419
420impl ThresholdMonitoringSystem {
421    /// Create new monitoring system
422    pub fn new() -> Self {
423        Self {
424            monitors: Arc::new(DashMap::new()),
425            alert_handlers: Arc::new(DashMap::new()),
426        }
427    }
428
429    /// Register metric with configuration
430    pub fn register_metric(&self, name: impl Into<String>, config: ThresholdConfig) -> Result<()> {
431        let name = name.into();
432        let monitor = MetricMonitor::new(config)?;
433        self.monitors.insert(name, monitor);
434        Ok(())
435    }
436
437    /// Record metric value and check for alerts
438    pub fn record(&self, metric_name: &str, value: f64) -> Vec<Alert> {
439        if let Some(mut monitor) = self.monitors.get_mut(metric_name) {
440            let alerts = monitor.check_value(metric_name, value);
441
442            // Trigger alert handlers
443            for alert in &alerts {
444                if let Some(handler) = self.alert_handlers.get(metric_name) {
445                    handler(alert);
446                }
447            }
448
449            alerts
450        } else {
451            Vec::new()
452        }
453    }
454
455    /// Get recent alerts for a metric
456    pub fn get_recent_alerts(&self, metric_name: &str) -> Vec<Alert> {
457        self.monitors
458            .get(metric_name)
459            .map(|m| m.recent_alerts.clone())
460            .unwrap_or_default()
461    }
462
463    /// Clear alerts for a metric
464    pub fn clear_alerts(&self, metric_name: &str) {
465        if let Some(mut monitor) = self.monitors.get_mut(metric_name) {
466            monitor.recent_alerts.clear();
467        }
468    }
469
470    /// Reset monitoring for a metric
471    pub fn reset_metric(&self, metric_name: &str) -> Result<()> {
472        if let Some(mut entry) = self.monitors.get_mut(metric_name) {
473            let config = entry.config.clone();
474            let new_monitor = MetricMonitor::new(config)?;
475            *entry = new_monitor;
476            Ok(())
477        } else {
478            Err(DecisionError::InvalidParameter(format!(
479                "Metric {} not found",
480                metric_name
481            )))
482        }
483    }
484
485    /// Get all monitored metrics
486    pub fn get_metrics(&self) -> Vec<String> {
487        self.monitors.iter().map(|e| e.key().clone()).collect()
488    }
489
490    /// Check if metric is registered
491    pub fn has_metric(&self, metric_name: &str) -> bool {
492        self.monitors.contains_key(metric_name)
493    }
494}
495
496impl Default for ThresholdMonitoringSystem {
497    fn default() -> Self {
498        Self::new()
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505
506    #[test]
507    fn test_alert_creation() {
508        let alert = Alert::new(
509            "quality",
510            AlertType::ThresholdViolation,
511            AlertSeverity::Warning,
512            "Low quality",
513            0.6,
514            Some(0.7),
515        );
516
517        assert_eq!(alert.metric_name, "quality");
518        assert_eq!(alert.alert_type, AlertType::ThresholdViolation);
519        assert_eq!(alert.severity, AlertSeverity::Warning);
520        assert_eq!(alert.value, 0.6);
521    }
522
523    #[test]
524    fn test_threshold_config_quality() {
525        let config = ThresholdConfig::quality();
526        assert_eq!(config.min_value, Some(0.5));
527        assert!(config.enable_drift_detection);
528        assert!(config.enable_anomaly_detection);
529    }
530
531    #[test]
532    fn test_threshold_config_cost() {
533        let config = ThresholdConfig::cost(1.0);
534        assert_eq!(config.max_value, Some(1.0));
535        assert_eq!(config.warning_max, Some(0.8));
536    }
537
538    #[test]
539    fn test_threshold_config_latency() {
540        let config = ThresholdConfig::latency(5000.0);
541        assert_eq!(config.max_value, Some(5000.0));
542        assert_eq!(config.warning_max, Some(4000.0));
543        assert_eq!(config.drift_algorithm, DriftAlgorithm::CUSUM);
544    }
545
546    #[test]
547    fn test_monitoring_system_creation() {
548        let system = ThresholdMonitoringSystem::new();
549        assert_eq!(system.get_metrics().len(), 0);
550    }
551
552    #[test]
553    fn test_register_metric() {
554        let system = ThresholdMonitoringSystem::new();
555        let config = ThresholdConfig::quality();
556
557        system.register_metric("quality", config).unwrap();
558        assert!(system.has_metric("quality"));
559        assert_eq!(system.get_metrics().len(), 1);
560    }
561
562    #[test]
563    fn test_threshold_violation() {
564        let system = ThresholdMonitoringSystem::new();
565        let config = ThresholdConfig {
566            min_value: Some(0.7),
567            max_value: Some(1.0),
568            ..Default::default()
569        };
570
571        system.register_metric("quality", config).unwrap();
572
573        // Below minimum
574        let alerts = system.record("quality", 0.5);
575        assert!(!alerts.is_empty());
576        assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Critical));
577
578        // Clear and test above maximum
579        system.clear_alerts("quality");
580        let alerts = system.record("quality", 1.5);
581        assert!(!alerts.is_empty());
582    }
583
584    #[test]
585    fn test_warning_threshold() {
586        let system = ThresholdMonitoringSystem::new();
587        let config = ThresholdConfig {
588            min_value: Some(0.5),
589            warning_min: Some(0.7),
590            ..Default::default()
591        };
592
593        system.register_metric("quality", config).unwrap();
594
595        // In warning zone
596        let alerts = system.record("quality", 0.6);
597        assert!(!alerts.is_empty());
598        assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Warning));
599    }
600
601    #[test]
602    fn test_drift_detection() {
603        let system = ThresholdMonitoringSystem::new();
604        let config = ThresholdConfig {
605            enable_drift_detection: true,
606            drift_algorithm: DriftAlgorithm::ADWIN,
607            enable_anomaly_detection: false,
608            ..Default::default()
609        };
610
611        system.register_metric("quality", config).unwrap();
612
613        // Add stable data
614        for _ in 0..30 {
615            system.record("quality", 0.9);
616        }
617
618        // Add drifted data
619        let mut drift_detected = false;
620        for _ in 0..30 {
621            let alerts = system.record("quality", 0.5);
622            if alerts.iter().any(|a| a.alert_type == AlertType::Drift) {
623                drift_detected = true;
624                break;
625            }
626        }
627
628        assert!(drift_detected);
629    }
630
631    #[test]
632    fn test_anomaly_detection() {
633        let system = ThresholdMonitoringSystem::new();
634        let config = ThresholdConfig {
635            enable_drift_detection: false,
636            enable_anomaly_detection: true,
637            anomaly_threshold: 3.0,
638            ..Default::default()
639        };
640
641        system.register_metric("latency", config).unwrap();
642
643        // Add normal data
644        for _ in 0..30 {
645            system.record("latency", 1000.0);
646        }
647
648        // Add anomaly
649        let alerts = system.record("latency", 5000.0);
650        assert!(alerts.iter().any(|a| a.alert_type == AlertType::Anomaly));
651    }
652
653    #[test]
654    fn test_recent_alerts() {
655        let system = ThresholdMonitoringSystem::new();
656        let config = ThresholdConfig {
657            min_value: Some(0.7),
658            ..Default::default()
659        };
660
661        system.register_metric("quality", config).unwrap();
662
663        // Generate alerts
664        system.record("quality", 0.5);
665        system.record("quality", 0.4);
666
667        let recent = system.get_recent_alerts("quality");
668        assert!(!recent.is_empty());
669    }
670
671    #[test]
672    fn test_clear_alerts() {
673        let system = ThresholdMonitoringSystem::new();
674        let config = ThresholdConfig {
675            min_value: Some(0.7),
676            ..Default::default()
677        };
678
679        system.register_metric("quality", config).unwrap();
680
681        system.record("quality", 0.5);
682        assert!(!system.get_recent_alerts("quality").is_empty());
683
684        system.clear_alerts("quality");
685        assert!(system.get_recent_alerts("quality").is_empty());
686    }
687
688    #[test]
689    fn test_reset_metric() {
690        let system = ThresholdMonitoringSystem::new();
691        let config = ThresholdConfig::quality();
692
693        system.register_metric("quality", config).unwrap();
694
695        // Add data
696        for _ in 0..20 {
697            system.record("quality", 0.9);
698        }
699
700        // Reset
701        system.reset_metric("quality").unwrap();
702
703        // Alerts should be cleared
704        assert!(system.get_recent_alerts("quality").is_empty());
705    }
706
707    #[test]
708    fn test_unregistered_metric() {
709        let system = ThresholdMonitoringSystem::new();
710        let alerts = system.record("unknown", 1.0);
711        assert!(alerts.is_empty());
712    }
713
714    #[test]
715    fn test_alert_severity_ordering() {
716        assert!(AlertSeverity::Critical > AlertSeverity::Warning);
717        assert!(AlertSeverity::Warning > AlertSeverity::Info);
718    }
719}