Skip to main content

systemprompt_analytics/services/
anomaly_detection.rs

1//! In-memory threshold and spike detection for live metrics.
2//!
3//! [`AnomalyDetectionService`] tracks per-metric warning/critical thresholds
4//! and a rolling one-hour event window, classifying each value as
5//! [`AnomalyLevel::Normal`], `Warning`, or `Critical` and detecting trend
6//! spikes relative to the recent average. State is held in memory behind
7//! `RwLock`s, not persisted.
8
9use std::collections::HashMap;
10use std::sync::Arc;
11
12use chrono::{DateTime, Duration, Utc};
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone)]
16pub struct AnomalyThresholdConfig {
17    pub warning_threshold: f64,
18    pub critical_threshold: f64,
19}
20
21impl Copy for AnomalyThresholdConfig {}
22
23#[derive(Debug, Clone)]
24pub struct AnomalyEvent {
25    pub timestamp: DateTime<Utc>,
26    pub value: f64,
27}
28
29impl Copy for AnomalyEvent {}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum AnomalyLevel {
33    Normal,
34    Warning,
35    Critical,
36}
37
38#[derive(Debug, Clone)]
39pub struct AnomalyCheckResult {
40    pub metric_name: String,
41    pub current_value: f64,
42    pub level: AnomalyLevel,
43    pub message: Option<String>,
44}
45
46#[derive(Clone, Debug)]
47pub struct AnomalyDetectionService {
48    thresholds: Arc<RwLock<HashMap<String, AnomalyThresholdConfig>>>,
49    recent_events: Arc<RwLock<HashMap<String, Vec<AnomalyEvent>>>>,
50}
51
52impl AnomalyDetectionService {
53    pub fn new() -> Self {
54        Self {
55            thresholds: Arc::new(RwLock::new(Self::default_thresholds())),
56            recent_events: Arc::new(RwLock::new(HashMap::new())),
57        }
58    }
59
60    fn default_thresholds() -> HashMap<String, AnomalyThresholdConfig> {
61        let mut thresholds = HashMap::new();
62
63        thresholds.insert(
64            "requests_per_minute".into(),
65            AnomalyThresholdConfig {
66                warning_threshold: 15.0,
67                critical_threshold: 30.0,
68            },
69        );
70
71        thresholds.insert(
72            "session_count_per_fingerprint".into(),
73            AnomalyThresholdConfig {
74                warning_threshold: 5.0,
75                critical_threshold: 10.0,
76            },
77        );
78
79        thresholds.insert(
80            "error_rate".into(),
81            AnomalyThresholdConfig {
82                warning_threshold: 0.1,
83                critical_threshold: 0.25,
84            },
85        );
86
87        thresholds
88    }
89
90    pub async fn check_anomaly(&self, metric_name: &str, value: f64) -> AnomalyCheckResult {
91        let level = {
92            let thresholds = self.thresholds.read().await;
93            thresholds
94                .get(metric_name)
95                .map_or(AnomalyLevel::Normal, |t| {
96                    if value >= t.critical_threshold {
97                        AnomalyLevel::Critical
98                    } else if value >= t.warning_threshold {
99                        AnomalyLevel::Warning
100                    } else {
101                        AnomalyLevel::Normal
102                    }
103                })
104        };
105
106        let message = match level {
107            AnomalyLevel::Critical => Some(format!(
108                "CRITICAL: {metric_name} = {value:.2} exceeds critical threshold"
109            )),
110            AnomalyLevel::Warning => Some(format!(
111                "WARNING: {metric_name} = {value:.2} exceeds warning threshold"
112            )),
113            AnomalyLevel::Normal => None,
114        };
115
116        AnomalyCheckResult {
117            metric_name: metric_name.to_owned(),
118            current_value: value,
119            level,
120            message,
121        }
122    }
123
124    pub async fn record_event(&self, metric_name: &str, value: f64) {
125        let now = Utc::now();
126        let cutoff = now - Duration::hours(1);
127        let key = metric_name.to_owned();
128        let event = AnomalyEvent {
129            timestamp: now,
130            value,
131        };
132
133        let mut events = self.recent_events.write().await;
134        events.entry(key).or_default().push(event);
135        if let Some(metric_events) = events.get_mut(metric_name) {
136            metric_events.retain(|e| e.timestamp > cutoff);
137        }
138    }
139
140    pub async fn check_trend_anomaly(
141        &self,
142        metric_name: &str,
143        window_minutes: i64,
144    ) -> Option<AnomalyCheckResult> {
145        let metric_events = {
146            let events = self.recent_events.read().await;
147            events.get(metric_name).cloned()?
148        };
149
150        if metric_events.len() < 2 {
151            return None;
152        }
153
154        let cutoff = Utc::now() - Duration::minutes(window_minutes);
155        let recent: Vec<_> = metric_events
156            .iter()
157            .filter(|e| e.timestamp > cutoff)
158            .collect();
159
160        if recent.is_empty() {
161            return None;
162        }
163
164        let avg: f64 = recent.iter().map(|e| e.value).sum::<f64>() / recent.len() as f64;
165        let latest = recent.last()?.value;
166        let spike_ratio = if avg > 0.0 { latest / avg } else { 1.0 };
167
168        if spike_ratio > 3.0 {
169            Some(AnomalyCheckResult {
170                metric_name: metric_name.to_owned(),
171                current_value: latest,
172                level: AnomalyLevel::Critical,
173                message: Some(format!(
174                    "Spike: {metric_name} jumped {spike_ratio:.1}x above average"
175                )),
176            })
177        } else if spike_ratio > 2.0 {
178            Some(AnomalyCheckResult {
179                metric_name: metric_name.to_owned(),
180                current_value: latest,
181                level: AnomalyLevel::Warning,
182                message: Some(format!(
183                    "Elevated: {metric_name} is {spike_ratio:.1}x above average"
184                )),
185            })
186        } else {
187            None
188        }
189    }
190
191    pub async fn update_threshold(&self, metric_name: &str, warning: f64, critical: f64) {
192        let mut thresholds = self.thresholds.write().await;
193        thresholds.insert(
194            metric_name.to_owned(),
195            AnomalyThresholdConfig {
196                warning_threshold: warning,
197                critical_threshold: critical,
198            },
199        );
200    }
201
202    pub async fn get_recent_events(&self, metric_name: &str) -> Vec<AnomalyEvent> {
203        let events = self.recent_events.read().await;
204        events.get(metric_name).cloned().unwrap_or_else(Vec::new)
205    }
206}
207
208impl Default for AnomalyDetectionService {
209    fn default() -> Self {
210        Self::new()
211    }
212}