1use std::collections::{HashMap, VecDeque};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use parking_lot::RwLock;
34use serde::{Deserialize, Serialize};
35
36pub mod ewma;
37pub mod sql_injection;
38
39pub use ewma::{Ewma, RateWindow};
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum Severity {
46 Info,
47 Warning,
48 Critical,
49}
50
51#[derive(Debug, Clone, Serialize)]
53#[serde(tag = "kind", rename_all = "snake_case")]
54pub enum AnomalyEvent {
55 RateSpike {
57 tenant: String,
58 rate_per_sec: f64,
59 baseline: f64,
60 z_score: f64,
61 severity: Severity,
62 detected_at: String,
63 },
64 AuthBurst {
66 user: String,
67 client_ip: String,
68 failures: u32,
69 window_secs: u32,
70 severity: Severity,
71 detected_at: String,
72 },
73 SqlInjection {
76 sql_excerpt: String,
77 patterns_matched: Vec<String>,
78 severity: Severity,
79 detected_at: String,
80 },
81 NovelQuery {
83 fingerprint: String,
84 sql_excerpt: String,
85 detected_at: String,
86 },
87}
88
89impl AnomalyEvent {
90 pub fn severity(&self) -> Severity {
91 match self {
92 AnomalyEvent::RateSpike { severity, .. } => *severity,
93 AnomalyEvent::AuthBurst { severity, .. } => *severity,
94 AnomalyEvent::SqlInjection { severity, .. } => *severity,
95 AnomalyEvent::NovelQuery { .. } => Severity::Info,
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
103pub struct AnomalyConfig {
104 pub rate_window_secs: u64,
106 pub spike_z_threshold: f64,
108 pub auth_window_secs: u64,
110 pub auth_critical_count: u32,
112 pub auth_warning_count: u32,
114 pub event_buffer_size: usize,
116 pub emit_novel_queries: bool,
119}
120
121impl Default for AnomalyConfig {
122 fn default() -> Self {
123 Self {
124 rate_window_secs: 60,
125 spike_z_threshold: 3.0,
126 auth_window_secs: 60,
127 auth_critical_count: 10,
128 auth_warning_count: 5,
129 event_buffer_size: 1024,
130 emit_novel_queries: true,
131 }
132 }
133}
134
135#[derive(Clone)]
139pub struct AnomalyDetector {
140 config: Arc<AnomalyConfig>,
141 rate_windows: Arc<RwLock<HashMap<String, RateWindow>>>,
142 auth_windows: Arc<RwLock<HashMap<(String, String), AuthBurstWindow>>>,
143 seen_fingerprints: Arc<RwLock<HashMap<String, ()>>>,
144 events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
145}
146
147impl AnomalyDetector {
148 pub fn new(config: AnomalyConfig) -> Self {
149 Self {
150 config: Arc::new(config),
151 rate_windows: Arc::new(RwLock::new(HashMap::new())),
152 auth_windows: Arc::new(RwLock::new(HashMap::new())),
153 seen_fingerprints: Arc::new(RwLock::new(HashMap::new())),
154 events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
155 }
156 }
157
158 pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
162 let mut emitted = Vec::new();
163
164 let mut rates = self.rate_windows.write();
166 let window = rates
167 .entry(ctx.tenant.clone())
168 .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
169 if let Some(spike) = window.observe_and_score(ctx.timestamp) {
170 if spike.z_score >= self.config.spike_z_threshold {
171 let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
172 Severity::Critical
173 } else {
174 Severity::Warning
175 };
176 let ev = AnomalyEvent::RateSpike {
177 tenant: ctx.tenant.clone(),
178 rate_per_sec: spike.rate,
179 baseline: spike.baseline,
180 z_score: spike.z_score,
181 severity,
182 detected_at: ctx.iso_timestamp.clone(),
183 };
184 emitted.push(ev.clone());
185 self.push_event(ev);
186 }
187 }
188 drop(rates);
189
190 if self.config.emit_novel_queries {
192 let mut seen = self.seen_fingerprints.write();
193 if !seen.contains_key(&ctx.fingerprint) {
194 seen.insert(ctx.fingerprint.clone(), ());
195 let ev = AnomalyEvent::NovelQuery {
196 fingerprint: ctx.fingerprint.clone(),
197 sql_excerpt: excerpt(&ctx.sql, 120),
198 detected_at: ctx.iso_timestamp.clone(),
199 };
200 emitted.push(ev.clone());
201 self.push_event(ev);
202 }
203 }
204
205 let matches = sql_injection::scan(&ctx.sql);
209 if !matches.is_empty() {
210 let severity = if matches.len() >= 2 {
211 Severity::Critical
212 } else {
213 Severity::Warning
214 };
215 let ev = AnomalyEvent::SqlInjection {
216 sql_excerpt: excerpt(&ctx.sql, 200),
217 patterns_matched: matches,
218 severity,
219 detected_at: ctx.iso_timestamp.clone(),
220 };
221 emitted.push(ev.clone());
222 self.push_event(ev);
223 }
224
225 emitted
226 }
227
228 pub fn record_auth(
231 &self,
232 user: &str,
233 client_ip: &str,
234 succeeded: bool,
235 timestamp: Instant,
236 iso_timestamp: &str,
237 ) -> Option<AnomalyEvent> {
238 if succeeded {
239 self.auth_windows
242 .write()
243 .remove(&(user.to_string(), client_ip.to_string()));
244 return None;
245 }
246 let mut windows = self.auth_windows.write();
247 let window = windows
248 .entry((user.to_string(), client_ip.to_string()))
249 .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
250 let count = window.record_failure(timestamp);
251 let severity = if count >= self.config.auth_critical_count {
252 Severity::Critical
253 } else if count >= self.config.auth_warning_count {
254 Severity::Warning
255 } else {
256 return None;
257 };
258 let ev = AnomalyEvent::AuthBurst {
259 user: user.to_string(),
260 client_ip: client_ip.to_string(),
261 failures: count,
262 window_secs: self.config.auth_window_secs as u32,
263 severity,
264 detected_at: iso_timestamp.to_string(),
265 };
266 drop(windows);
267 self.push_event(ev.clone());
268 Some(ev)
269 }
270
271 pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
273 let evs = self.events.read();
274 let n = limit.min(evs.len());
275 let mut out = Vec::with_capacity(n);
276 for ev in evs.iter().rev().take(n) {
277 out.push(ev.clone());
278 }
279 out
280 }
281
282 pub fn event_count(&self) -> usize {
285 self.events.read().len()
286 }
287
288 fn push_event(&self, ev: AnomalyEvent) {
289 let mut evs = self.events.write();
290 if evs.len() >= self.config.event_buffer_size {
291 evs.pop_front();
292 }
293 evs.push_back(ev);
294 }
295}
296
297#[derive(Debug, Clone)]
301pub struct QueryObservation {
302 pub tenant: String,
304 pub fingerprint: String,
307 pub sql: String,
309 pub timestamp: Instant,
312 pub iso_timestamp: String,
315}
316
317struct AuthBurstWindow {
320 window: Duration,
321 failures: VecDeque<Instant>,
322}
323
324impl AuthBurstWindow {
325 fn new(window_secs: u64) -> Self {
326 Self {
327 window: Duration::from_secs(window_secs),
328 failures: VecDeque::new(),
329 }
330 }
331
332 fn record_failure(&mut self, now: Instant) -> u32 {
333 while let Some(&front) = self.failures.front() {
335 if now.duration_since(front) > self.window {
336 self.failures.pop_front();
337 } else {
338 break;
339 }
340 }
341 self.failures.push_back(now);
342 self.failures.len() as u32
343 }
344}
345
346fn excerpt(s: &str, max: usize) -> String {
347 if s.len() <= max {
348 s.to_string()
349 } else {
350 format!("{}…", &s[..max])
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use super::*;
357
358 fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
359 QueryObservation {
360 tenant: tenant.into(),
361 fingerprint: fp.into(),
362 sql: sql.into(),
363 timestamp: Instant::now(),
364 iso_timestamp: "2026-04-25T13:30:00Z".into(),
365 }
366 }
367
368 #[test]
369 fn novel_query_fires_once_per_fingerprint() {
370 let d = AnomalyDetector::new(AnomalyConfig::default());
371 let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
372 assert!(evs
373 .iter()
374 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
375 let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
376 assert!(!evs2
377 .iter()
378 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
379 }
380
381 #[test]
382 fn novel_query_can_be_suppressed_via_config() {
383 let mut cfg = AnomalyConfig::default();
384 cfg.emit_novel_queries = false;
385 let d = AnomalyDetector::new(cfg);
386 let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
387 assert!(!evs
388 .iter()
389 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
390 }
391
392 #[test]
393 fn sql_injection_detector_flags_classic_or_payload() {
394 let d = AnomalyDetector::new(AnomalyConfig::default());
395 let evs = d.record_query(&obs(
396 "acme",
397 "fp-inj",
398 "SELECT * FROM users WHERE id = 1 OR 1=1 --",
399 ));
400 let sqli = evs
401 .iter()
402 .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
403 assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
404 }
405
406 #[test]
407 fn auth_burst_warning_below_critical_threshold() {
408 let d = AnomalyDetector::new(AnomalyConfig::default());
409 let now = Instant::now();
410 let mut last = None;
411 for _ in 0..6 {
412 last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
413 }
414 match last {
415 Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
416 assert_eq!(failures, 6);
417 assert_eq!(severity, Severity::Warning);
418 }
419 other => panic!("expected AuthBurst Warning, got {:?}", other),
420 }
421 }
422
423 #[test]
424 fn auth_burst_critical_at_high_threshold() {
425 let d = AnomalyDetector::new(AnomalyConfig::default());
426 let now = Instant::now();
427 let mut last = None;
428 for _ in 0..12 {
429 last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
430 }
431 match last {
432 Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
433 assert_eq!(failures, 12);
434 assert_eq!(severity, Severity::Critical);
435 }
436 other => panic!("expected AuthBurst Critical, got {:?}", other),
437 }
438 }
439
440 #[test]
441 fn auth_success_resets_burst_window() {
442 let d = AnomalyDetector::new(AnomalyConfig::default());
443 let now = Instant::now();
444 for _ in 0..6 {
445 let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
446 }
447 let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
449 let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
450 assert!(r.is_none());
452 }
453
454 #[test]
455 fn recent_events_returns_newest_first() {
456 let d = AnomalyDetector::new(AnomalyConfig::default());
457 let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
458 let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
459 let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
460 let recent = d.recent_events(10);
461 match &recent[0] {
463 AnomalyEvent::NovelQuery { fingerprint, .. } => {
464 assert_eq!(fingerprint, "fp3")
465 }
466 other => panic!("expected NovelQuery fp3, got {:?}", other),
467 }
468 }
469
470 #[test]
471 fn recent_events_respects_limit() {
472 let d = AnomalyDetector::new(AnomalyConfig::default());
473 for i in 0..50 {
474 let fp = format!("fp{}", i);
475 let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
476 }
477 assert_eq!(d.recent_events(10).len(), 10);
478 assert_eq!(d.recent_events(100).len(), 50);
479 }
480
481 #[test]
482 fn event_buffer_evicts_oldest_when_full() {
483 let mut cfg = AnomalyConfig::default();
484 cfg.event_buffer_size = 5;
485 let d = AnomalyDetector::new(cfg);
486 for i in 0..20 {
487 let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
488 }
489 assert_eq!(d.event_count(), 5);
493 }
494}