1use chrono::{DateTime, Utc};
7use dashmap::DashMap;
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10use uuid::Uuid;
11
12use crate::{
13 anomaly_detection::{AnomalyResult, MADDetector, ZScoreDetector},
14 drift_detection::{DriftAlgorithm, DriftStatus, PageHinkley, ADWIN, CUSUM},
15 errors::{DecisionError, Result},
16};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
20pub enum AlertSeverity {
21 Info,
23 Warning,
25 Critical,
27}
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
31pub enum AlertType {
32 ThresholdViolation,
34 Drift,
36 Anomaly,
38 PerformanceDegradation,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct Alert {
45 pub id: Uuid,
47 pub metric_name: String,
49 pub alert_type: AlertType,
51 pub severity: AlertSeverity,
53 pub message: String,
55 pub value: f64,
57 pub threshold: Option<f64>,
59 pub timestamp: DateTime<Utc>,
61 pub context: String,
63}
64
65impl Alert {
66 pub fn new(
68 metric_name: impl Into<String>,
69 alert_type: AlertType,
70 severity: AlertSeverity,
71 message: impl Into<String>,
72 value: f64,
73 threshold: Option<f64>,
74 ) -> Self {
75 Self {
76 id: Uuid::new_v4(),
77 metric_name: metric_name.into(),
78 alert_type,
79 severity,
80 message: message.into(),
81 value,
82 threshold,
83 timestamp: Utc::now(),
84 context: String::new(),
85 }
86 }
87
88 pub fn with_context(mut self, context: impl Into<String>) -> Self {
90 self.context = context.into();
91 self
92 }
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct ThresholdConfig {
98 pub min_value: Option<f64>,
100 pub max_value: Option<f64>,
102 pub warning_min: Option<f64>,
104 pub warning_max: Option<f64>,
106 pub enable_drift_detection: bool,
108 pub drift_algorithm: DriftAlgorithm,
110 pub enable_anomaly_detection: bool,
112 pub anomaly_threshold: f64,
114}
115
116impl Default for ThresholdConfig {
117 fn default() -> Self {
118 Self {
119 min_value: None,
120 max_value: None,
121 warning_min: None,
122 warning_max: None,
123 enable_drift_detection: true,
124 drift_algorithm: DriftAlgorithm::ADWIN,
125 enable_anomaly_detection: true,
126 anomaly_threshold: 3.0,
127 }
128 }
129}
130
131impl ThresholdConfig {
132 pub fn quality() -> Self {
134 Self {
135 min_value: Some(0.5),
136 max_value: Some(1.0),
137 warning_min: Some(0.7),
138 warning_max: None,
139 enable_drift_detection: true,
140 drift_algorithm: DriftAlgorithm::ADWIN,
141 enable_anomaly_detection: true,
142 anomaly_threshold: 2.5,
143 }
144 }
145
146 pub fn cost(max_cost: f64) -> Self {
148 Self {
149 min_value: Some(0.0),
150 max_value: Some(max_cost),
151 warning_min: None,
152 warning_max: Some(max_cost * 0.8),
153 enable_drift_detection: true,
154 drift_algorithm: DriftAlgorithm::PageHinkley,
155 enable_anomaly_detection: true,
156 anomaly_threshold: 3.0,
157 }
158 }
159
160 pub fn latency(max_latency: f64) -> Self {
162 Self {
163 min_value: Some(0.0),
164 max_value: Some(max_latency),
165 warning_min: None,
166 warning_max: Some(max_latency * 0.8),
167 enable_drift_detection: true,
168 drift_algorithm: DriftAlgorithm::CUSUM,
169 enable_anomaly_detection: true,
170 anomaly_threshold: 2.5,
171 }
172 }
173}
174
175struct MetricMonitor {
177 config: ThresholdConfig,
179 adwin: Option<ADWIN>,
181 page_hinkley: Option<PageHinkley>,
183 cusum: Option<CUSUM>,
185 zscore: Option<ZScoreDetector>,
187 mad: Option<MADDetector>,
189 recent_alerts: Vec<Alert>,
191 sample_count: u64,
193}
194
195impl MetricMonitor {
196 fn new(config: ThresholdConfig) -> Result<Self> {
198 let adwin = if config.enable_drift_detection
199 && config.drift_algorithm == DriftAlgorithm::ADWIN
200 {
201 Some(ADWIN::new(0.002, 100)?)
202 } else {
203 None
204 };
205
206 let page_hinkley = if config.enable_drift_detection
207 && config.drift_algorithm == DriftAlgorithm::PageHinkley
208 {
209 Some(PageHinkley::new(10.0, 0.005)?)
210 } else {
211 None
212 };
213
214 let cusum = if config.enable_drift_detection
215 && config.drift_algorithm == DriftAlgorithm::CUSUM
216 {
217 Some(CUSUM::new(5.0, 0.5, 0.1)?)
219 } else {
220 None
221 };
222
223 let zscore = if config.enable_anomaly_detection {
224 Some(ZScoreDetector::new(50, config.anomaly_threshold)?)
225 } else {
226 None
227 };
228
229 let mad = if config.enable_anomaly_detection {
230 Some(MADDetector::new(50, config.anomaly_threshold)?)
231 } else {
232 None
233 };
234
235 Ok(Self {
236 config,
237 adwin,
238 page_hinkley,
239 cusum,
240 zscore,
241 mad,
242 recent_alerts: Vec::new(),
243 sample_count: 0,
244 })
245 }
246
247 fn check_value(&mut self, metric_name: &str, value: f64) -> Vec<Alert> {
249 let mut alerts = Vec::new();
250 self.sample_count += 1;
251
252 if let Some(min) = self.config.min_value {
254 if value < min {
255 alerts.push(
256 Alert::new(
257 metric_name,
258 AlertType::ThresholdViolation,
259 AlertSeverity::Critical,
260 format!("Value {} below minimum threshold {}", value, min),
261 value,
262 Some(min),
263 )
264 .with_context(format!("Sample count: {}", self.sample_count)),
265 );
266 }
267 }
268
269 if let Some(max) = self.config.max_value {
270 if value > max {
271 alerts.push(
272 Alert::new(
273 metric_name,
274 AlertType::ThresholdViolation,
275 AlertSeverity::Critical,
276 format!("Value {} exceeds maximum threshold {}", value, max),
277 value,
278 Some(max),
279 )
280 .with_context(format!("Sample count: {}", self.sample_count)),
281 );
282 }
283 }
284
285 if let Some(warning_min) = self.config.warning_min {
287 if value < warning_min && !alerts.iter().any(|a| matches!(a.alert_type, AlertType::ThresholdViolation)) {
288 alerts.push(
289 Alert::new(
290 metric_name,
291 AlertType::ThresholdViolation,
292 AlertSeverity::Warning,
293 format!("Value {} below warning threshold {}", value, warning_min),
294 value,
295 Some(warning_min),
296 )
297 .with_context(format!("Sample count: {}", self.sample_count)),
298 );
299 }
300 }
301
302 if let Some(warning_max) = self.config.warning_max {
303 if value > warning_max && !alerts.iter().any(|a| matches!(a.alert_type, AlertType::ThresholdViolation)) {
304 alerts.push(
305 Alert::new(
306 metric_name,
307 AlertType::ThresholdViolation,
308 AlertSeverity::Warning,
309 format!("Value {} exceeds warning threshold {}", value, warning_max),
310 value,
311 Some(warning_max),
312 )
313 .with_context(format!("Sample count: {}", self.sample_count)),
314 );
315 }
316 }
317
318 if self.config.enable_drift_detection {
320 let drift_status = if let Some(adwin) = &mut self.adwin {
321 adwin.add(value)
322 } else if let Some(ph) = &mut self.page_hinkley {
323 ph.add(value)
324 } else if let Some(cusum) = &mut self.cusum {
325 cusum.add(value)
326 } else {
327 DriftStatus::Stable
328 };
329
330 match drift_status {
331 DriftStatus::Drift => {
332 alerts.push(
333 Alert::new(
334 metric_name,
335 AlertType::Drift,
336 AlertSeverity::Critical,
337 format!(
338 "Drift detected using {:?}",
339 self.config.drift_algorithm
340 ),
341 value,
342 None,
343 )
344 .with_context(format!("Sample count: {}", self.sample_count)),
345 );
346 }
347 DriftStatus::Warning => {
348 alerts.push(
349 Alert::new(
350 metric_name,
351 AlertType::Drift,
352 AlertSeverity::Warning,
353 "Possible drift detected".to_string(),
354 value,
355 None,
356 )
357 .with_context(format!("Sample count: {}", self.sample_count)),
358 );
359 }
360 DriftStatus::Stable => {}
361 }
362 }
363
364 if self.config.enable_anomaly_detection {
366 let anomaly_result = if let Some(zscore) = &mut self.zscore {
367 zscore.add(value)
368 } else if let Some(mad) = &mut self.mad {
369 mad.add(value)
370 } else {
371 AnomalyResult::normal(0.0)
372 };
373
374 if anomaly_result.is_anomaly {
375 let severity = if anomaly_result.severity > 0.7 {
376 AlertSeverity::Critical
377 } else {
378 AlertSeverity::Warning
379 };
380
381 alerts.push(
382 Alert::new(
383 metric_name,
384 AlertType::Anomaly,
385 severity,
386 format!(
387 "Anomaly detected (score: {:.2}, severity: {:.2})",
388 anomaly_result.score, anomaly_result.severity
389 ),
390 value,
391 Some(self.config.anomaly_threshold),
392 )
393 .with_context(format!("Sample count: {}", self.sample_count)),
394 );
395 }
396 }
397
398 for alert in &alerts {
400 self.recent_alerts.push(alert.clone());
401 }
402
403 if self.recent_alerts.len() > 100 {
405 self.recent_alerts.drain(0..self.recent_alerts.len() - 100);
406 }
407
408 alerts
409 }
410}
411
412pub struct ThresholdMonitoringSystem {
414 monitors: Arc<DashMap<String, MetricMonitor>>,
416 alert_handlers: Arc<DashMap<String, Box<dyn Fn(&Alert) + Send + Sync>>>,
418}
419
420impl ThresholdMonitoringSystem {
421 pub fn new() -> Self {
423 Self {
424 monitors: Arc::new(DashMap::new()),
425 alert_handlers: Arc::new(DashMap::new()),
426 }
427 }
428
429 pub fn register_metric(&self, name: impl Into<String>, config: ThresholdConfig) -> Result<()> {
431 let name = name.into();
432 let monitor = MetricMonitor::new(config)?;
433 self.monitors.insert(name, monitor);
434 Ok(())
435 }
436
437 pub fn record(&self, metric_name: &str, value: f64) -> Vec<Alert> {
439 if let Some(mut monitor) = self.monitors.get_mut(metric_name) {
440 let alerts = monitor.check_value(metric_name, value);
441
442 for alert in &alerts {
444 if let Some(handler) = self.alert_handlers.get(metric_name) {
445 handler(alert);
446 }
447 }
448
449 alerts
450 } else {
451 Vec::new()
452 }
453 }
454
455 pub fn get_recent_alerts(&self, metric_name: &str) -> Vec<Alert> {
457 self.monitors
458 .get(metric_name)
459 .map(|m| m.recent_alerts.clone())
460 .unwrap_or_default()
461 }
462
463 pub fn clear_alerts(&self, metric_name: &str) {
465 if let Some(mut monitor) = self.monitors.get_mut(metric_name) {
466 monitor.recent_alerts.clear();
467 }
468 }
469
470 pub fn reset_metric(&self, metric_name: &str) -> Result<()> {
472 if let Some(mut entry) = self.monitors.get_mut(metric_name) {
473 let config = entry.config.clone();
474 let new_monitor = MetricMonitor::new(config)?;
475 *entry = new_monitor;
476 Ok(())
477 } else {
478 Err(DecisionError::InvalidParameter(format!(
479 "Metric {} not found",
480 metric_name
481 )))
482 }
483 }
484
485 pub fn get_metrics(&self) -> Vec<String> {
487 self.monitors.iter().map(|e| e.key().clone()).collect()
488 }
489
490 pub fn has_metric(&self, metric_name: &str) -> bool {
492 self.monitors.contains_key(metric_name)
493 }
494}
495
496impl Default for ThresholdMonitoringSystem {
497 fn default() -> Self {
498 Self::new()
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505
506 #[test]
507 fn test_alert_creation() {
508 let alert = Alert::new(
509 "quality",
510 AlertType::ThresholdViolation,
511 AlertSeverity::Warning,
512 "Low quality",
513 0.6,
514 Some(0.7),
515 );
516
517 assert_eq!(alert.metric_name, "quality");
518 assert_eq!(alert.alert_type, AlertType::ThresholdViolation);
519 assert_eq!(alert.severity, AlertSeverity::Warning);
520 assert_eq!(alert.value, 0.6);
521 }
522
523 #[test]
524 fn test_threshold_config_quality() {
525 let config = ThresholdConfig::quality();
526 assert_eq!(config.min_value, Some(0.5));
527 assert!(config.enable_drift_detection);
528 assert!(config.enable_anomaly_detection);
529 }
530
531 #[test]
532 fn test_threshold_config_cost() {
533 let config = ThresholdConfig::cost(1.0);
534 assert_eq!(config.max_value, Some(1.0));
535 assert_eq!(config.warning_max, Some(0.8));
536 }
537
538 #[test]
539 fn test_threshold_config_latency() {
540 let config = ThresholdConfig::latency(5000.0);
541 assert_eq!(config.max_value, Some(5000.0));
542 assert_eq!(config.warning_max, Some(4000.0));
543 assert_eq!(config.drift_algorithm, DriftAlgorithm::CUSUM);
544 }
545
546 #[test]
547 fn test_monitoring_system_creation() {
548 let system = ThresholdMonitoringSystem::new();
549 assert_eq!(system.get_metrics().len(), 0);
550 }
551
552 #[test]
553 fn test_register_metric() {
554 let system = ThresholdMonitoringSystem::new();
555 let config = ThresholdConfig::quality();
556
557 system.register_metric("quality", config).unwrap();
558 assert!(system.has_metric("quality"));
559 assert_eq!(system.get_metrics().len(), 1);
560 }
561
562 #[test]
563 fn test_threshold_violation() {
564 let system = ThresholdMonitoringSystem::new();
565 let config = ThresholdConfig {
566 min_value: Some(0.7),
567 max_value: Some(1.0),
568 ..Default::default()
569 };
570
571 system.register_metric("quality", config).unwrap();
572
573 let alerts = system.record("quality", 0.5);
575 assert!(!alerts.is_empty());
576 assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Critical));
577
578 system.clear_alerts("quality");
580 let alerts = system.record("quality", 1.5);
581 assert!(!alerts.is_empty());
582 }
583
584 #[test]
585 fn test_warning_threshold() {
586 let system = ThresholdMonitoringSystem::new();
587 let config = ThresholdConfig {
588 min_value: Some(0.5),
589 warning_min: Some(0.7),
590 ..Default::default()
591 };
592
593 system.register_metric("quality", config).unwrap();
594
595 let alerts = system.record("quality", 0.6);
597 assert!(!alerts.is_empty());
598 assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Warning));
599 }
600
601 #[test]
602 fn test_drift_detection() {
603 let system = ThresholdMonitoringSystem::new();
604 let config = ThresholdConfig {
605 enable_drift_detection: true,
606 drift_algorithm: DriftAlgorithm::ADWIN,
607 enable_anomaly_detection: false,
608 ..Default::default()
609 };
610
611 system.register_metric("quality", config).unwrap();
612
613 for _ in 0..30 {
615 system.record("quality", 0.9);
616 }
617
618 let mut drift_detected = false;
620 for _ in 0..30 {
621 let alerts = system.record("quality", 0.5);
622 if alerts.iter().any(|a| a.alert_type == AlertType::Drift) {
623 drift_detected = true;
624 break;
625 }
626 }
627
628 assert!(drift_detected);
629 }
630
631 #[test]
632 fn test_anomaly_detection() {
633 let system = ThresholdMonitoringSystem::new();
634 let config = ThresholdConfig {
635 enable_drift_detection: false,
636 enable_anomaly_detection: true,
637 anomaly_threshold: 3.0,
638 ..Default::default()
639 };
640
641 system.register_metric("latency", config).unwrap();
642
643 for _ in 0..30 {
645 system.record("latency", 1000.0);
646 }
647
648 let alerts = system.record("latency", 5000.0);
650 assert!(alerts.iter().any(|a| a.alert_type == AlertType::Anomaly));
651 }
652
653 #[test]
654 fn test_recent_alerts() {
655 let system = ThresholdMonitoringSystem::new();
656 let config = ThresholdConfig {
657 min_value: Some(0.7),
658 ..Default::default()
659 };
660
661 system.register_metric("quality", config).unwrap();
662
663 system.record("quality", 0.5);
665 system.record("quality", 0.4);
666
667 let recent = system.get_recent_alerts("quality");
668 assert!(!recent.is_empty());
669 }
670
671 #[test]
672 fn test_clear_alerts() {
673 let system = ThresholdMonitoringSystem::new();
674 let config = ThresholdConfig {
675 min_value: Some(0.7),
676 ..Default::default()
677 };
678
679 system.register_metric("quality", config).unwrap();
680
681 system.record("quality", 0.5);
682 assert!(!system.get_recent_alerts("quality").is_empty());
683
684 system.clear_alerts("quality");
685 assert!(system.get_recent_alerts("quality").is_empty());
686 }
687
688 #[test]
689 fn test_reset_metric() {
690 let system = ThresholdMonitoringSystem::new();
691 let config = ThresholdConfig::quality();
692
693 system.register_metric("quality", config).unwrap();
694
695 for _ in 0..20 {
697 system.record("quality", 0.9);
698 }
699
700 system.reset_metric("quality").unwrap();
702
703 assert!(system.get_recent_alerts("quality").is_empty());
705 }
706
707 #[test]
708 fn test_unregistered_metric() {
709 let system = ThresholdMonitoringSystem::new();
710 let alerts = system.record("unknown", 1.0);
711 assert!(alerts.is_empty());
712 }
713
714 #[test]
715 fn test_alert_severity_ordering() {
716 assert!(AlertSeverity::Critical > AlertSeverity::Warning);
717 assert!(AlertSeverity::Warning > AlertSeverity::Info);
718 }
719}