systemprompt_analytics/services/
anomaly_detection.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use chrono::{DateTime, Duration, Utc};
5use tokio::sync::RwLock;
6
7#[derive(Debug, Clone)]
8pub struct AnomalyThresholdConfig {
9 pub warning_threshold: f64,
10 pub critical_threshold: f64,
11}
12
13impl Copy for AnomalyThresholdConfig {}
14
15#[derive(Debug, Clone)]
16pub struct AnomalyEvent {
17 pub timestamp: DateTime<Utc>,
18 pub value: f64,
19}
20
21impl Copy for AnomalyEvent {}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum AnomalyLevel {
25 Normal,
26 Warning,
27 Critical,
28}
29
30#[derive(Debug, Clone)]
31pub struct AnomalyCheckResult {
32 pub metric_name: String,
33 pub current_value: f64,
34 pub level: AnomalyLevel,
35 pub message: Option<String>,
36}
37
38#[derive(Clone, Debug)]
39pub struct AnomalyDetectionService {
40 thresholds: Arc<RwLock<HashMap<String, AnomalyThresholdConfig>>>,
41 recent_events: Arc<RwLock<HashMap<String, Vec<AnomalyEvent>>>>,
42}
43
44impl AnomalyDetectionService {
45 pub fn new() -> Self {
46 Self {
47 thresholds: Arc::new(RwLock::new(Self::default_thresholds())),
48 recent_events: Arc::new(RwLock::new(HashMap::new())),
49 }
50 }
51
52 fn default_thresholds() -> HashMap<String, AnomalyThresholdConfig> {
53 let mut thresholds = HashMap::new();
54
55 thresholds.insert(
56 "requests_per_minute".into(),
57 AnomalyThresholdConfig {
58 warning_threshold: 15.0,
59 critical_threshold: 30.0,
60 },
61 );
62
63 thresholds.insert(
64 "session_count_per_fingerprint".into(),
65 AnomalyThresholdConfig {
66 warning_threshold: 5.0,
67 critical_threshold: 10.0,
68 },
69 );
70
71 thresholds.insert(
72 "error_rate".into(),
73 AnomalyThresholdConfig {
74 warning_threshold: 0.1,
75 critical_threshold: 0.25,
76 },
77 );
78
79 thresholds
80 }
81
82 pub async fn check_anomaly(&self, metric_name: &str, value: f64) -> AnomalyCheckResult {
83 let level = {
84 let thresholds = self.thresholds.read().await;
85 thresholds
86 .get(metric_name)
87 .map_or(AnomalyLevel::Normal, |t| {
88 if value >= t.critical_threshold {
89 AnomalyLevel::Critical
90 } else if value >= t.warning_threshold {
91 AnomalyLevel::Warning
92 } else {
93 AnomalyLevel::Normal
94 }
95 })
96 };
97
98 let message = match level {
99 AnomalyLevel::Critical => Some(format!(
100 "CRITICAL: {metric_name} = {value:.2} exceeds critical threshold"
101 )),
102 AnomalyLevel::Warning => Some(format!(
103 "WARNING: {metric_name} = {value:.2} exceeds warning threshold"
104 )),
105 AnomalyLevel::Normal => None,
106 };
107
108 AnomalyCheckResult {
109 metric_name: metric_name.to_string(),
110 current_value: value,
111 level,
112 message,
113 }
114 }
115
116 pub async fn record_event(&self, metric_name: &str, value: f64) {
117 let now = Utc::now();
118 let cutoff = now - Duration::hours(1);
119 let key = metric_name.to_string();
120 let event = AnomalyEvent {
121 timestamp: now,
122 value,
123 };
124
125 let mut events = self.recent_events.write().await;
126 events.entry(key).or_default().push(event);
127 if let Some(metric_events) = events.get_mut(metric_name) {
128 metric_events.retain(|e| e.timestamp > cutoff);
129 }
130 }
131
132 pub async fn check_trend_anomaly(
133 &self,
134 metric_name: &str,
135 window_minutes: i64,
136 ) -> Option<AnomalyCheckResult> {
137 let metric_events = {
138 let events = self.recent_events.read().await;
139 events.get(metric_name).cloned()?
140 };
141
142 if metric_events.len() < 2 {
143 return None;
144 }
145
146 let cutoff = Utc::now() - Duration::minutes(window_minutes);
147 let recent: Vec<_> = metric_events
148 .iter()
149 .filter(|e| e.timestamp > cutoff)
150 .collect();
151
152 if recent.is_empty() {
153 return None;
154 }
155
156 let avg: f64 = recent.iter().map(|e| e.value).sum::<f64>() / recent.len() as f64;
157 let latest = recent.last()?.value;
158 let spike_ratio = if avg > 0.0 { latest / avg } else { 1.0 };
159
160 if spike_ratio > 3.0 {
161 Some(AnomalyCheckResult {
162 metric_name: metric_name.to_string(),
163 current_value: latest,
164 level: AnomalyLevel::Critical,
165 message: Some(format!(
166 "Spike: {metric_name} jumped {spike_ratio:.1}x above average"
167 )),
168 })
169 } else if spike_ratio > 2.0 {
170 Some(AnomalyCheckResult {
171 metric_name: metric_name.to_string(),
172 current_value: latest,
173 level: AnomalyLevel::Warning,
174 message: Some(format!(
175 "Elevated: {metric_name} is {spike_ratio:.1}x above average"
176 )),
177 })
178 } else {
179 None
180 }
181 }
182
183 pub async fn update_threshold(&self, metric_name: &str, warning: f64, critical: f64) {
184 let mut thresholds = self.thresholds.write().await;
185 thresholds.insert(
186 metric_name.to_string(),
187 AnomalyThresholdConfig {
188 warning_threshold: warning,
189 critical_threshold: critical,
190 },
191 );
192 }
193
194 pub async fn get_recent_events(&self, metric_name: &str) -> Vec<AnomalyEvent> {
195 let events = self.recent_events.read().await;
196 events.get(metric_name).cloned().unwrap_or_else(Vec::new)
197 }
198}
199
200impl Default for AnomalyDetectionService {
201 fn default() -> Self {
202 Self::new()
203 }
204}