1use std::collections::VecDeque;
30
31use dashmap::DashMap;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36use serde::{Deserialize, Serialize};
37
38pub mod ewma;
39pub mod sql_injection;
40
41pub use ewma::{Ewma, RateWindow};
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "lowercase")]
47pub enum Severity {
48 Info,
49 Warning,
50 Critical,
51}
52
53#[derive(Debug, Clone, Serialize)]
55#[serde(tag = "kind", rename_all = "snake_case")]
56pub enum AnomalyEvent {
57 RateSpike {
59 tenant: String,
60 rate_per_sec: f64,
61 baseline: f64,
62 z_score: f64,
63 severity: Severity,
64 detected_at: String,
65 },
66 AuthBurst {
68 user: String,
69 client_ip: String,
70 failures: u32,
71 window_secs: u32,
72 severity: Severity,
73 detected_at: String,
74 },
75 SqlInjection {
78 sql_excerpt: String,
79 patterns_matched: Vec<String>,
80 severity: Severity,
81 detected_at: String,
82 },
83 NovelQuery {
85 fingerprint: String,
86 sql_excerpt: String,
87 detected_at: String,
88 },
89}
90
91impl AnomalyEvent {
92 pub fn severity(&self) -> Severity {
93 match self {
94 AnomalyEvent::RateSpike { severity, .. } => *severity,
95 AnomalyEvent::AuthBurst { severity, .. } => *severity,
96 AnomalyEvent::SqlInjection { severity, .. } => *severity,
97 AnomalyEvent::NovelQuery { .. } => Severity::Info,
98 }
99 }
100}
101
102#[derive(Debug, Clone)]
105pub struct AnomalyConfig {
106 pub rate_window_secs: u64,
108 pub spike_z_threshold: f64,
110 pub auth_window_secs: u64,
112 pub auth_critical_count: u32,
114 pub auth_warning_count: u32,
116 pub event_buffer_size: usize,
118 pub emit_novel_queries: bool,
121}
122
123impl Default for AnomalyConfig {
124 fn default() -> Self {
125 Self {
126 rate_window_secs: 60,
127 spike_z_threshold: 3.0,
128 auth_window_secs: 60,
129 auth_critical_count: 10,
130 auth_warning_count: 5,
131 event_buffer_size: 1024,
132 emit_novel_queries: true,
133 }
134 }
135}
136
137#[derive(Clone)]
141pub struct AnomalyDetector {
142 config: Arc<AnomalyConfig>,
143 rate_windows: Arc<DashMap<String, RateWindow>>,
147 auth_windows: Arc<DashMap<(String, String), AuthBurstWindow>>,
148 seen_fingerprints: Arc<DashMap<String, ()>>,
149 events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
150}
151
152const MAX_SEEN_FINGERPRINTS: usize = 100_000;
156
157impl AnomalyDetector {
158 pub fn new(config: AnomalyConfig) -> Self {
159 Self {
160 config: Arc::new(config),
161 rate_windows: Arc::new(DashMap::new()),
162 auth_windows: Arc::new(DashMap::new()),
163 seen_fingerprints: Arc::new(DashMap::new()),
164 events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
165 }
166 }
167
168 pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
172 let mut emitted = Vec::new();
173
174 {
176 let mut window = self
177 .rate_windows
178 .entry(ctx.tenant.clone())
179 .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
180 if let Some(spike) = window.observe_and_score(ctx.timestamp) {
181 if spike.z_score >= self.config.spike_z_threshold {
182 let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
183 Severity::Critical
184 } else {
185 Severity::Warning
186 };
187 let ev = AnomalyEvent::RateSpike {
188 tenant: ctx.tenant.clone(),
189 rate_per_sec: spike.rate,
190 baseline: spike.baseline,
191 z_score: spike.z_score,
192 severity,
193 detected_at: chrono::Utc::now().to_rfc3339(),
194 };
195 drop(window); emitted.push(ev.clone());
197 self.push_event(ev);
198 }
199 }
200 }
201
202 if self.config.emit_novel_queries
205 && !self.seen_fingerprints.contains_key(&ctx.fingerprint)
206 {
207 if self.seen_fingerprints.len() >= MAX_SEEN_FINGERPRINTS {
209 self.seen_fingerprints.clear();
210 }
211 if self
214 .seen_fingerprints
215 .insert(ctx.fingerprint.clone(), ())
216 .is_none()
217 {
218 let ev = AnomalyEvent::NovelQuery {
219 fingerprint: ctx.fingerprint.clone(),
220 sql_excerpt: excerpt(&ctx.sql, 120),
221 detected_at: chrono::Utc::now().to_rfc3339(),
222 };
223 emitted.push(ev.clone());
224 self.push_event(ev);
225 }
226 }
227
228 let matches = sql_injection::scan(&ctx.sql);
232 if !matches.is_empty() {
233 let severity = if matches.len() >= 2 {
234 Severity::Critical
235 } else {
236 Severity::Warning
237 };
238 let ev = AnomalyEvent::SqlInjection {
239 sql_excerpt: excerpt(&ctx.sql, 200),
240 patterns_matched: matches,
241 severity,
242 detected_at: chrono::Utc::now().to_rfc3339(),
243 };
244 emitted.push(ev.clone());
245 self.push_event(ev);
246 }
247
248 emitted
249 }
250
251 pub fn record_auth(
254 &self,
255 user: &str,
256 client_ip: &str,
257 succeeded: bool,
258 timestamp: Instant,
259 iso_timestamp: &str,
260 ) -> Option<AnomalyEvent> {
261 if succeeded {
262 self.auth_windows
265 .remove(&(user.to_string(), client_ip.to_string()));
266 return None;
267 }
268 let count = {
269 let mut window = self
270 .auth_windows
271 .entry((user.to_string(), client_ip.to_string()))
272 .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
273 window.record_failure(timestamp)
274 };
275 let severity = if count >= self.config.auth_critical_count {
276 Severity::Critical
277 } else if count >= self.config.auth_warning_count {
278 Severity::Warning
279 } else {
280 return None;
281 };
282 let ev = AnomalyEvent::AuthBurst {
283 user: user.to_string(),
284 client_ip: client_ip.to_string(),
285 failures: count,
286 window_secs: self.config.auth_window_secs as u32,
287 severity,
288 detected_at: iso_timestamp.to_string(),
289 };
290 self.push_event(ev.clone());
291 Some(ev)
292 }
293
294 pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
296 let evs = self.events.read();
297 let n = limit.min(evs.len());
298 let mut out = Vec::with_capacity(n);
299 for ev in evs.iter().rev().take(n) {
300 out.push(ev.clone());
301 }
302 out
303 }
304
305 pub fn event_count(&self) -> usize {
308 self.events.read().len()
309 }
310
311 fn push_event(&self, ev: AnomalyEvent) {
312 let mut evs = self.events.write();
313 if evs.len() >= self.config.event_buffer_size {
314 evs.pop_front();
315 }
316 evs.push_back(ev);
317 }
318}
319
320#[derive(Debug, Clone)]
324pub struct QueryObservation {
325 pub tenant: String,
327 pub fingerprint: String,
330 pub sql: String,
332 pub timestamp: Instant,
335}
336
337struct AuthBurstWindow {
340 window: Duration,
341 failures: VecDeque<Instant>,
342}
343
344impl AuthBurstWindow {
345 fn new(window_secs: u64) -> Self {
346 Self {
347 window: Duration::from_secs(window_secs),
348 failures: VecDeque::new(),
349 }
350 }
351
352 fn record_failure(&mut self, now: Instant) -> u32 {
353 while let Some(&front) = self.failures.front() {
355 if now.duration_since(front) > self.window {
356 self.failures.pop_front();
357 } else {
358 break;
359 }
360 }
361 self.failures.push_back(now);
362 self.failures.len() as u32
363 }
364}
365
366fn excerpt(s: &str, max: usize) -> String {
367 if s.len() <= max {
368 s.to_string()
369 } else {
370 format!("{}…", &s[..max])
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
379 QueryObservation {
380 tenant: tenant.into(),
381 fingerprint: fp.into(),
382 sql: sql.into(),
383 timestamp: Instant::now(),
384 }
385 }
386
387 #[test]
388 fn novel_query_fires_once_per_fingerprint() {
389 let d = AnomalyDetector::new(AnomalyConfig::default());
390 let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
391 assert!(evs
392 .iter()
393 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
394 let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
395 assert!(!evs2
396 .iter()
397 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
398 }
399
400 #[test]
401 fn novel_query_can_be_suppressed_via_config() {
402 let mut cfg = AnomalyConfig::default();
403 cfg.emit_novel_queries = false;
404 let d = AnomalyDetector::new(cfg);
405 let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
406 assert!(!evs
407 .iter()
408 .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
409 }
410
411 #[test]
412 fn sql_injection_detector_flags_classic_or_payload() {
413 let d = AnomalyDetector::new(AnomalyConfig::default());
414 let evs = d.record_query(&obs(
415 "acme",
416 "fp-inj",
417 "SELECT * FROM users WHERE id = 1 OR 1=1 --",
418 ));
419 let sqli = evs
420 .iter()
421 .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
422 assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
423 }
424
425 #[test]
426 fn auth_burst_warning_below_critical_threshold() {
427 let d = AnomalyDetector::new(AnomalyConfig::default());
428 let now = Instant::now();
429 let mut last = None;
430 for _ in 0..6 {
431 last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
432 }
433 match last {
434 Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
435 assert_eq!(failures, 6);
436 assert_eq!(severity, Severity::Warning);
437 }
438 other => panic!("expected AuthBurst Warning, got {:?}", other),
439 }
440 }
441
442 #[test]
443 fn auth_burst_critical_at_high_threshold() {
444 let d = AnomalyDetector::new(AnomalyConfig::default());
445 let now = Instant::now();
446 let mut last = None;
447 for _ in 0..12 {
448 last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
449 }
450 match last {
451 Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
452 assert_eq!(failures, 12);
453 assert_eq!(severity, Severity::Critical);
454 }
455 other => panic!("expected AuthBurst Critical, got {:?}", other),
456 }
457 }
458
459 #[test]
460 fn auth_success_resets_burst_window() {
461 let d = AnomalyDetector::new(AnomalyConfig::default());
462 let now = Instant::now();
463 for _ in 0..6 {
464 let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
465 }
466 let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
468 let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
469 assert!(r.is_none());
471 }
472
473 #[test]
474 fn recent_events_returns_newest_first() {
475 let d = AnomalyDetector::new(AnomalyConfig::default());
476 let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
477 let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
478 let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
479 let recent = d.recent_events(10);
480 match &recent[0] {
482 AnomalyEvent::NovelQuery { fingerprint, .. } => {
483 assert_eq!(fingerprint, "fp3")
484 }
485 other => panic!("expected NovelQuery fp3, got {:?}", other),
486 }
487 }
488
489 #[test]
490 fn recent_events_respects_limit() {
491 let d = AnomalyDetector::new(AnomalyConfig::default());
492 for i in 0..50 {
493 let fp = format!("fp{}", i);
494 let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
495 }
496 assert_eq!(d.recent_events(10).len(), 10);
497 assert_eq!(d.recent_events(100).len(), 50);
498 }
499
500 #[test]
501 fn event_buffer_evicts_oldest_when_full() {
502 let mut cfg = AnomalyConfig::default();
503 cfg.event_buffer_size = 5;
504 let d = AnomalyDetector::new(cfg);
505 for i in 0..20 {
506 let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
507 }
508 assert_eq!(d.event_count(), 5);
512 }
513}