1use std::collections::VecDeque;
30
31use dashmap::DashMap;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36use serde::{Deserialize, Serialize};
37
38pub mod ewma;
39pub mod sql_injection;
40
41pub use ewma::{Ewma, RateWindow};
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "lowercase")]
47pub enum Severity {
48 Info,
49 Warning,
50 Critical,
51}
52
53#[derive(Debug, Clone, Serialize)]
55#[serde(tag = "kind", rename_all = "snake_case")]
56pub enum AnomalyEvent {
57 RateSpike {
59 tenant: String,
60 rate_per_sec: f64,
61 baseline: f64,
62 z_score: f64,
63 severity: Severity,
64 detected_at: String,
65 },
66 AuthBurst {
68 user: String,
69 client_ip: String,
70 failures: u32,
71 window_secs: u32,
72 severity: Severity,
73 detected_at: String,
74 },
75 SqlInjection {
78 sql_excerpt: String,
79 patterns_matched: Vec<String>,
80 severity: Severity,
81 detected_at: String,
82 },
83 NovelQuery {
85 fingerprint: String,
86 sql_excerpt: String,
87 detected_at: String,
88 },
89}
90
91impl AnomalyEvent {
92 pub fn severity(&self) -> Severity {
93 match self {
94 AnomalyEvent::RateSpike { severity, .. } => *severity,
95 AnomalyEvent::AuthBurst { severity, .. } => *severity,
96 AnomalyEvent::SqlInjection { severity, .. } => *severity,
97 AnomalyEvent::NovelQuery { .. } => Severity::Info,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
105pub struct AnomalyConfig {
106 pub rate_window_secs: u64,
108 pub spike_z_threshold: f64,
110 pub auth_window_secs: u64,
112 pub auth_critical_count: u32,
114 pub auth_warning_count: u32,
116 pub event_buffer_size: usize,
118 pub emit_novel_queries: bool,
121}
122
123impl Default for AnomalyConfig {
124 fn default() -> Self {
125 Self {
126 rate_window_secs: 60,
127 spike_z_threshold: 3.0,
128 auth_window_secs: 60,
129 auth_critical_count: 10,
130 auth_warning_count: 5,
131 event_buffer_size: 1024,
132 emit_novel_queries: true,
133 }
134 }
135}
136
137#[derive(Clone)]
141pub struct AnomalyDetector {
142 config: Arc<AnomalyConfig>,
143 rate_windows: Arc<DashMap<String, RateWindow>>,
147 auth_windows: Arc<DashMap<(String, String), AuthBurstWindow>>,
148 seen_fingerprints: Arc<DashMap<String, ()>>,
149 events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
150}
151
152const MAX_SEEN_FINGERPRINTS: usize = 100_000;
156
157impl AnomalyDetector {
158 pub fn new(config: AnomalyConfig) -> Self {
159 Self {
160 config: Arc::new(config),
161 rate_windows: Arc::new(DashMap::new()),
162 auth_windows: Arc::new(DashMap::new()),
163 seen_fingerprints: Arc::new(DashMap::new()),
164 events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
165 }
166 }
167
168 pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
172 let mut emitted = Vec::new();
173
174 {
176 let mut window = self
177 .rate_windows
178 .entry(ctx.tenant.clone())
179 .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
180 if let Some(spike) = window.observe_and_score(ctx.timestamp) {
181 if spike.z_score >= self.config.spike_z_threshold {
182 let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
183 Severity::Critical
184 } else {
185 Severity::Warning
186 };
187 let ev = AnomalyEvent::RateSpike {
188 tenant: ctx.tenant.clone(),
189 rate_per_sec: spike.rate,
190 baseline: spike.baseline,
191 z_score: spike.z_score,
192 severity,
193 detected_at: chrono::Utc::now().to_rfc3339(),
194 };
195 drop(window); emitted.push(ev.clone());
197 self.push_event(ev);
198 }
199 }
200 }
201
202 if self.config.emit_novel_queries && !self.seen_fingerprints.contains_key(&ctx.fingerprint)
205 {
206 if self.seen_fingerprints.len() >= MAX_SEEN_FINGERPRINTS {
208 self.seen_fingerprints.clear();
209 }
210 if self
213 .seen_fingerprints
214 .insert(ctx.fingerprint.clone(), ())
215 .is_none()
216 {
217 let ev = AnomalyEvent::NovelQuery {
218 fingerprint: ctx.fingerprint.clone(),
219 sql_excerpt: excerpt(&ctx.sql, 120),
220 detected_at: chrono::Utc::now().to_rfc3339(),
221 };
222 emitted.push(ev.clone());
223 self.push_event(ev);
224 }
225 }
226
227 let matches = sql_injection::scan(&ctx.sql);
231 if !matches.is_empty() {
232 let severity = if matches.len() >= 2 {
233 Severity::Critical
234 } else {
235 Severity::Warning
236 };
237 let ev = AnomalyEvent::SqlInjection {
238 sql_excerpt: excerpt(&ctx.sql, 200),
239 patterns_matched: matches,
240 severity,
241 detected_at: chrono::Utc::now().to_rfc3339(),
242 };
243 emitted.push(ev.clone());
244 self.push_event(ev);
245 }
246
247 emitted
248 }
249
250 pub fn record_auth(
253 &self,
254 user: &str,
255 client_ip: &str,
256 succeeded: bool,
257 timestamp: Instant,
258 iso_timestamp: &str,
259 ) -> Option<AnomalyEvent> {
260 if succeeded {
261 self.auth_windows
264 .remove(&(user.to_string(), client_ip.to_string()));
265 return None;
266 }
267 let count = {
268 let mut window = self
269 .auth_windows
270 .entry((user.to_string(), client_ip.to_string()))
271 .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
272 window.record_failure(timestamp)
273 };
274 let severity = if count >= self.config.auth_critical_count {
275 Severity::Critical
276 } else if count >= self.config.auth_warning_count {
277 Severity::Warning
278 } else {
279 return None;
280 };
281 let ev = AnomalyEvent::AuthBurst {
282 user: user.to_string(),
283 client_ip: client_ip.to_string(),
284 failures: count,
285 window_secs: self.config.auth_window_secs as u32,
286 severity,
287 detected_at: iso_timestamp.to_string(),
288 };
289 self.push_event(ev.clone());
290 Some(ev)
291 }
292
293 pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
295 let evs = self.events.read();
296 let n = limit.min(evs.len());
297 let mut out = Vec::with_capacity(n);
298 for ev in evs.iter().rev().take(n) {
299 out.push(ev.clone());
300 }
301 out
302 }
303
304 pub fn event_count(&self) -> usize {
307 self.events.read().len()
308 }
309
310 fn push_event(&self, ev: AnomalyEvent) {
311 let mut evs = self.events.write();
312 if evs.len() >= self.config.event_buffer_size {
313 evs.pop_front();
314 }
315 evs.push_back(ev);
316 }
317}
318
319#[derive(Debug, Clone)]
323pub struct QueryObservation {
324 pub tenant: String,
326 pub fingerprint: String,
329 pub sql: String,
331 pub timestamp: Instant,
334}
335
336struct AuthBurstWindow {
339 window: Duration,
340 failures: VecDeque<Instant>,
341}
342
343impl AuthBurstWindow {
344 fn new(window_secs: u64) -> Self {
345 Self {
346 window: Duration::from_secs(window_secs),
347 failures: VecDeque::new(),
348 }
349 }
350
351 fn record_failure(&mut self, now: Instant) -> u32 {
352 while let Some(&front) = self.failures.front() {
354 if now.duration_since(front) > self.window {
355 self.failures.pop_front();
356 } else {
357 break;
358 }
359 }
360 self.failures.push_back(now);
361 self.failures.len() as u32
362 }
363}
364
365fn excerpt(s: &str, max: usize) -> String {
366 if s.len() <= max {
367 s.to_string()
368 } else {
369 format!("{}…", &s[..max])
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
378 QueryObservation {
379 tenant: tenant.into(),
380 fingerprint: fp.into(),
381 sql: sql.into(),
382 timestamp: Instant::now(),
383 }
384 }
385
386 #[test]
387 fn novel_query_fires_once_per_fingerprint() {
388 let d = AnomalyDetector::new(AnomalyConfig::default());
389 let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
390 assert!(evs
391 .iter()
392 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
393 let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
394 assert!(!evs2
395 .iter()
396 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
397 }
398
399 #[test]
400 fn novel_query_can_be_suppressed_via_config() {
401 let mut cfg = AnomalyConfig::default();
402 cfg.emit_novel_queries = false;
403 let d = AnomalyDetector::new(cfg);
404 let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
405 assert!(!evs
406 .iter()
407 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
408 }
409
410 #[test]
411 fn sql_injection_detector_flags_classic_or_payload() {
412 let d = AnomalyDetector::new(AnomalyConfig::default());
413 let evs = d.record_query(&obs(
414 "acme",
415 "fp-inj",
416 "SELECT * FROM users WHERE id = 1 OR 1=1 --",
417 ));
418 let sqli = evs
419 .iter()
420 .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
421 assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
422 }
423
424 #[test]
425 fn auth_burst_warning_below_critical_threshold() {
426 let d = AnomalyDetector::new(AnomalyConfig::default());
427 let now = Instant::now();
428 let mut last = None;
429 for _ in 0..6 {
430 last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
431 }
432 match last {
433 Some(AnomalyEvent::AuthBurst {
434 failures, severity, ..
435 }) => {
436 assert_eq!(failures, 6);
437 assert_eq!(severity, Severity::Warning);
438 }
439 other => panic!("expected AuthBurst Warning, got {:?}", other),
440 }
441 }
442
443 #[test]
444 fn auth_burst_critical_at_high_threshold() {
445 let d = AnomalyDetector::new(AnomalyConfig::default());
446 let now = Instant::now();
447 let mut last = None;
448 for _ in 0..12 {
449 last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
450 }
451 match last {
452 Some(AnomalyEvent::AuthBurst {
453 failures, severity, ..
454 }) => {
455 assert_eq!(failures, 12);
456 assert_eq!(severity, Severity::Critical);
457 }
458 other => panic!("expected AuthBurst Critical, got {:?}", other),
459 }
460 }
461
462 #[test]
463 fn auth_success_resets_burst_window() {
464 let d = AnomalyDetector::new(AnomalyConfig::default());
465 let now = Instant::now();
466 for _ in 0..6 {
467 let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
468 }
469 let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
471 let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
472 assert!(r.is_none());
474 }
475
476 #[test]
477 fn recent_events_returns_newest_first() {
478 let d = AnomalyDetector::new(AnomalyConfig::default());
479 let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
480 let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
481 let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
482 let recent = d.recent_events(10);
483 match &recent[0] {
485 AnomalyEvent::NovelQuery { fingerprint, .. } => {
486 assert_eq!(fingerprint, "fp3")
487 }
488 other => panic!("expected NovelQuery fp3, got {:?}", other),
489 }
490 }
491
492 #[test]
493 fn recent_events_respects_limit() {
494 let d = AnomalyDetector::new(AnomalyConfig::default());
495 for i in 0..50 {
496 let fp = format!("fp{}", i);
497 let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
498 }
499 assert_eq!(d.recent_events(10).len(), 10);
500 assert_eq!(d.recent_events(100).len(), 50);
501 }
502
503 #[test]
504 fn event_buffer_evicts_oldest_when_full() {
505 let mut cfg = AnomalyConfig::default();
506 cfg.event_buffer_size = 5;
507 let d = AnomalyDetector::new(cfg);
508 for i in 0..20 {
509 let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
510 }
511 assert_eq!(d.event_count(), 5);
515 }
516}