Skip to main content

systemprompt_analytics/services/
anomaly_detection.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::{DateTime, Duration, Utc};
5use tokio::sync::RwLock;
6
7#[derive(Debug, Clone)]
8pub struct AnomalyThresholdConfig {
9    pub warning_threshold: f64,
10    pub critical_threshold: f64,
11}
12
13impl Copy for AnomalyThresholdConfig {}
14
15#[derive(Debug, Clone)]
16pub struct AnomalyEvent {
17    pub timestamp: DateTime<Utc>,
18    pub value: f64,
19}
20
21impl Copy for AnomalyEvent {}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum AnomalyLevel {
25    Normal,
26    Warning,
27    Critical,
28}
29
30#[derive(Debug, Clone)]
31pub struct AnomalyCheckResult {
32    pub metric_name: String,
33    pub current_value: f64,
34    pub level: AnomalyLevel,
35    pub message: Option<String>,
36}
37
38#[derive(Clone, Debug)]
39pub struct AnomalyDetectionService {
40    thresholds: Arc<RwLock<HashMap<String, AnomalyThresholdConfig>>>,
41    recent_events: Arc<RwLock<HashMap<String, Vec<AnomalyEvent>>>>,
42}
43
44impl AnomalyDetectionService {
45    pub fn new() -> Self {
46        Self {
47            thresholds: Arc::new(RwLock::new(Self::default_thresholds())),
48            recent_events: Arc::new(RwLock::new(HashMap::new())),
49        }
50    }
51
52    fn default_thresholds() -> HashMap<String, AnomalyThresholdConfig> {
53        let mut thresholds = HashMap::new();
54
55        thresholds.insert(
56            "requests_per_minute".into(),
57            AnomalyThresholdConfig {
58                warning_threshold: 15.0,
59                critical_threshold: 30.0,
60            },
61        );
62
63        thresholds.insert(
64            "session_count_per_fingerprint".into(),
65            AnomalyThresholdConfig {
66                warning_threshold: 5.0,
67                critical_threshold: 10.0,
68            },
69        );
70
71        thresholds.insert(
72            "error_rate".into(),
73            AnomalyThresholdConfig {
74                warning_threshold: 0.1,
75                critical_threshold: 0.25,
76            },
77        );
78
79        thresholds
80    }
81
82    pub async fn check_anomaly(&self, metric_name: &str, value: f64) -> AnomalyCheckResult {
83        let level = {
84            let thresholds = self.thresholds.read().await;
85            thresholds
86                .get(metric_name)
87                .map_or(AnomalyLevel::Normal, |t| {
88                    if value >= t.critical_threshold {
89                        AnomalyLevel::Critical
90                    } else if value >= t.warning_threshold {
91                        AnomalyLevel::Warning
92                    } else {
93                        AnomalyLevel::Normal
94                    }
95                })
96        };
97
98        let message = match level {
99            AnomalyLevel::Critical => Some(format!(
100                "CRITICAL: {metric_name} = {value:.2} exceeds critical threshold"
101            )),
102            AnomalyLevel::Warning => Some(format!(
103                "WARNING: {metric_name} = {value:.2} exceeds warning threshold"
104            )),
105            AnomalyLevel::Normal => None,
106        };
107
108        AnomalyCheckResult {
109            metric_name: metric_name.to_string(),
110            current_value: value,
111            level,
112            message,
113        }
114    }
115
116    pub async fn record_event(&self, metric_name: &str, value: f64) {
117        let now = Utc::now();
118        let cutoff = now - Duration::hours(1);
119        let key = metric_name.to_string();
120        let event = AnomalyEvent {
121            timestamp: now,
122            value,
123        };
124
125        let mut events = self.recent_events.write().await;
126        events.entry(key).or_default().push(event);
127        if let Some(metric_events) = events.get_mut(metric_name) {
128            metric_events.retain(|e| e.timestamp > cutoff);
129        }
130    }
131
132    pub async fn check_trend_anomaly(
133        &self,
134        metric_name: &str,
135        window_minutes: i64,
136    ) -> Option<AnomalyCheckResult> {
137        let metric_events = {
138            let events = self.recent_events.read().await;
139            events.get(metric_name).cloned()?
140        };
141
142        if metric_events.len() < 2 {
143            return None;
144        }
145
146        let cutoff = Utc::now() - Duration::minutes(window_minutes);
147        let recent: Vec<_> = metric_events
148            .iter()
149            .filter(|e| e.timestamp > cutoff)
150            .collect();
151
152        if recent.is_empty() {
153            return None;
154        }
155
156        let avg: f64 = recent.iter().map(|e| e.value).sum::<f64>() / recent.len() as f64;
157        let latest = recent.last()?.value;
158        let spike_ratio = if avg > 0.0 { latest / avg } else { 1.0 };
159
160        if spike_ratio > 3.0 {
161            Some(AnomalyCheckResult {
162                metric_name: metric_name.to_string(),
163                current_value: latest,
164                level: AnomalyLevel::Critical,
165                message: Some(format!(
166                    "Spike: {metric_name} jumped {spike_ratio:.1}x above average"
167                )),
168            })
169        } else if spike_ratio > 2.0 {
170            Some(AnomalyCheckResult {
171                metric_name: metric_name.to_string(),
172                current_value: latest,
173                level: AnomalyLevel::Warning,
174                message: Some(format!(
175                    "Elevated: {metric_name} is {spike_ratio:.1}x above average"
176                )),
177            })
178        } else {
179            None
180        }
181    }
182
183    pub async fn update_threshold(&self, metric_name: &str, warning: f64, critical: f64) {
184        let mut thresholds = self.thresholds.write().await;
185        thresholds.insert(
186            metric_name.to_string(),
187            AnomalyThresholdConfig {
188                warning_threshold: warning,
189                critical_threshold: critical,
190            },
191        );
192    }
193
194    pub async fn get_recent_events(&self, metric_name: &str) -> Vec<AnomalyEvent> {
195        let events = self.recent_events.read().await;
196        events.get(metric_name).cloned().unwrap_or_else(Vec::new)
197    }
198}
199
200impl Default for AnomalyDetectionService {
201    fn default() -> Self {
202        Self::new()
203    }
204}