systemprompt_analytics/services/
anomaly_detection.rs1use std::collections::HashMap;
10use std::sync::Arc;
11
12use chrono::{DateTime, Duration, Utc};
13use tokio::sync::RwLock;
14
15#[derive(Debug, Clone)]
16pub struct AnomalyThresholdConfig {
17 pub warning_threshold: f64,
18 pub critical_threshold: f64,
19}
20
21impl Copy for AnomalyThresholdConfig {}
22
23#[derive(Debug, Clone)]
24pub struct AnomalyEvent {
25 pub timestamp: DateTime<Utc>,
26 pub value: f64,
27}
28
29impl Copy for AnomalyEvent {}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum AnomalyLevel {
33 Normal,
34 Warning,
35 Critical,
36}
37
38#[derive(Debug, Clone)]
39pub struct AnomalyCheckResult {
40 pub metric_name: String,
41 pub current_value: f64,
42 pub level: AnomalyLevel,
43 pub message: Option<String>,
44}
45
46#[derive(Clone, Debug)]
47pub struct AnomalyDetectionService {
48 thresholds: Arc<RwLock<HashMap<String, AnomalyThresholdConfig>>>,
49 recent_events: Arc<RwLock<HashMap<String, Vec<AnomalyEvent>>>>,
50}
51
52impl AnomalyDetectionService {
53 pub fn new() -> Self {
54 Self {
55 thresholds: Arc::new(RwLock::new(Self::default_thresholds())),
56 recent_events: Arc::new(RwLock::new(HashMap::new())),
57 }
58 }
59
60 fn default_thresholds() -> HashMap<String, AnomalyThresholdConfig> {
61 let mut thresholds = HashMap::new();
62
63 thresholds.insert(
64 "requests_per_minute".into(),
65 AnomalyThresholdConfig {
66 warning_threshold: 15.0,
67 critical_threshold: 30.0,
68 },
69 );
70
71 thresholds.insert(
72 "session_count_per_fingerprint".into(),
73 AnomalyThresholdConfig {
74 warning_threshold: 5.0,
75 critical_threshold: 10.0,
76 },
77 );
78
79 thresholds.insert(
80 "error_rate".into(),
81 AnomalyThresholdConfig {
82 warning_threshold: 0.1,
83 critical_threshold: 0.25,
84 },
85 );
86
87 thresholds
88 }
89
90 pub async fn check_anomaly(&self, metric_name: &str, value: f64) -> AnomalyCheckResult {
91 let level = {
92 let thresholds = self.thresholds.read().await;
93 thresholds
94 .get(metric_name)
95 .map_or(AnomalyLevel::Normal, |t| {
96 if value >= t.critical_threshold {
97 AnomalyLevel::Critical
98 } else if value >= t.warning_threshold {
99 AnomalyLevel::Warning
100 } else {
101 AnomalyLevel::Normal
102 }
103 })
104 };
105
106 let message = match level {
107 AnomalyLevel::Critical => Some(format!(
108 "CRITICAL: {metric_name} = {value:.2} exceeds critical threshold"
109 )),
110 AnomalyLevel::Warning => Some(format!(
111 "WARNING: {metric_name} = {value:.2} exceeds warning threshold"
112 )),
113 AnomalyLevel::Normal => None,
114 };
115
116 AnomalyCheckResult {
117 metric_name: metric_name.to_owned(),
118 current_value: value,
119 level,
120 message,
121 }
122 }
123
124 pub async fn record_event(&self, metric_name: &str, value: f64) {
125 let now = Utc::now();
126 let cutoff = now - Duration::hours(1);
127 let key = metric_name.to_owned();
128 let event = AnomalyEvent {
129 timestamp: now,
130 value,
131 };
132
133 let mut events = self.recent_events.write().await;
134 events.entry(key).or_default().push(event);
135 if let Some(metric_events) = events.get_mut(metric_name) {
136 metric_events.retain(|e| e.timestamp > cutoff);
137 }
138 }
139
140 pub async fn check_trend_anomaly(
141 &self,
142 metric_name: &str,
143 window_minutes: i64,
144 ) -> Option<AnomalyCheckResult> {
145 let metric_events = {
146 let events = self.recent_events.read().await;
147 events.get(metric_name).cloned()?
148 };
149
150 if metric_events.len() < 2 {
151 return None;
152 }
153
154 let cutoff = Utc::now() - Duration::minutes(window_minutes);
155 let recent: Vec<_> = metric_events
156 .iter()
157 .filter(|e| e.timestamp > cutoff)
158 .collect();
159
160 if recent.is_empty() {
161 return None;
162 }
163
164 let avg: f64 = recent.iter().map(|e| e.value).sum::<f64>() / recent.len() as f64;
165 let latest = recent.last()?.value;
166 let spike_ratio = if avg > 0.0 { latest / avg } else { 1.0 };
167
168 if spike_ratio > 3.0 {
169 Some(AnomalyCheckResult {
170 metric_name: metric_name.to_owned(),
171 current_value: latest,
172 level: AnomalyLevel::Critical,
173 message: Some(format!(
174 "Spike: {metric_name} jumped {spike_ratio:.1}x above average"
175 )),
176 })
177 } else if spike_ratio > 2.0 {
178 Some(AnomalyCheckResult {
179 metric_name: metric_name.to_owned(),
180 current_value: latest,
181 level: AnomalyLevel::Warning,
182 message: Some(format!(
183 "Elevated: {metric_name} is {spike_ratio:.1}x above average"
184 )),
185 })
186 } else {
187 None
188 }
189 }
190
191 pub async fn update_threshold(&self, metric_name: &str, warning: f64, critical: f64) {
192 let mut thresholds = self.thresholds.write().await;
193 thresholds.insert(
194 metric_name.to_owned(),
195 AnomalyThresholdConfig {
196 warning_threshold: warning,
197 critical_threshold: critical,
198 },
199 );
200 }
201
202 pub async fn get_recent_events(&self, metric_name: &str) -> Vec<AnomalyEvent> {
203 let events = self.recent_events.read().await;
204 events.get(metric_name).cloned().unwrap_or_else(Vec::new)
205 }
206}
207
208impl Default for AnomalyDetectionService {
209 fn default() -> Self {
210 Self::new()
211 }
212}