use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
pub mod ewma;
pub mod sql_injection;
pub use ewma::{Ewma, RateWindow};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
Info,
Warning,
Critical,
}
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AnomalyEvent {
RateSpike {
tenant: String,
rate_per_sec: f64,
baseline: f64,
z_score: f64,
severity: Severity,
detected_at: String,
},
AuthBurst {
user: String,
client_ip: String,
failures: u32,
window_secs: u32,
severity: Severity,
detected_at: String,
},
SqlInjection {
sql_excerpt: String,
patterns_matched: Vec<String>,
severity: Severity,
detected_at: String,
},
NovelQuery {
fingerprint: String,
sql_excerpt: String,
detected_at: String,
},
}
impl AnomalyEvent {
pub fn severity(&self) -> Severity {
match self {
AnomalyEvent::RateSpike { severity, .. } => *severity,
AnomalyEvent::AuthBurst { severity, .. } => *severity,
AnomalyEvent::SqlInjection { severity, .. } => *severity,
AnomalyEvent::NovelQuery { .. } => Severity::Info,
}
}
}
#[derive(Debug, Clone)]
pub struct AnomalyConfig {
pub rate_window_secs: u64,
pub spike_z_threshold: f64,
pub auth_window_secs: u64,
pub auth_critical_count: u32,
pub auth_warning_count: u32,
pub event_buffer_size: usize,
pub emit_novel_queries: bool,
}
impl Default for AnomalyConfig {
fn default() -> Self {
Self {
rate_window_secs: 60,
spike_z_threshold: 3.0,
auth_window_secs: 60,
auth_critical_count: 10,
auth_warning_count: 5,
event_buffer_size: 1024,
emit_novel_queries: true,
}
}
}
#[derive(Clone)]
pub struct AnomalyDetector {
config: Arc<AnomalyConfig>,
rate_windows: Arc<RwLock<HashMap<String, RateWindow>>>,
auth_windows: Arc<RwLock<HashMap<(String, String), AuthBurstWindow>>>,
seen_fingerprints: Arc<RwLock<HashMap<String, ()>>>,
events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
}
impl AnomalyDetector {
pub fn new(config: AnomalyConfig) -> Self {
Self {
config: Arc::new(config),
rate_windows: Arc::new(RwLock::new(HashMap::new())),
auth_windows: Arc::new(RwLock::new(HashMap::new())),
seen_fingerprints: Arc::new(RwLock::new(HashMap::new())),
events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
}
}
pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
let mut emitted = Vec::new();
let mut rates = self.rate_windows.write();
let window = rates
.entry(ctx.tenant.clone())
.or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
if let Some(spike) = window.observe_and_score(ctx.timestamp) {
if spike.z_score >= self.config.spike_z_threshold {
let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
Severity::Critical
} else {
Severity::Warning
};
let ev = AnomalyEvent::RateSpike {
tenant: ctx.tenant.clone(),
rate_per_sec: spike.rate,
baseline: spike.baseline,
z_score: spike.z_score,
severity,
detected_at: ctx.iso_timestamp.clone(),
};
emitted.push(ev.clone());
self.push_event(ev);
}
}
drop(rates);
if self.config.emit_novel_queries {
let mut seen = self.seen_fingerprints.write();
if !seen.contains_key(&ctx.fingerprint) {
seen.insert(ctx.fingerprint.clone(), ());
let ev = AnomalyEvent::NovelQuery {
fingerprint: ctx.fingerprint.clone(),
sql_excerpt: excerpt(&ctx.sql, 120),
detected_at: ctx.iso_timestamp.clone(),
};
emitted.push(ev.clone());
self.push_event(ev);
}
}
let matches = sql_injection::scan(&ctx.sql);
if !matches.is_empty() {
let severity = if matches.len() >= 2 {
Severity::Critical
} else {
Severity::Warning
};
let ev = AnomalyEvent::SqlInjection {
sql_excerpt: excerpt(&ctx.sql, 200),
patterns_matched: matches,
severity,
detected_at: ctx.iso_timestamp.clone(),
};
emitted.push(ev.clone());
self.push_event(ev);
}
emitted
}
pub fn record_auth(
&self,
user: &str,
client_ip: &str,
succeeded: bool,
timestamp: Instant,
iso_timestamp: &str,
) -> Option<AnomalyEvent> {
if succeeded {
self.auth_windows
.write()
.remove(&(user.to_string(), client_ip.to_string()));
return None;
}
let mut windows = self.auth_windows.write();
let window = windows
.entry((user.to_string(), client_ip.to_string()))
.or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
let count = window.record_failure(timestamp);
let severity = if count >= self.config.auth_critical_count {
Severity::Critical
} else if count >= self.config.auth_warning_count {
Severity::Warning
} else {
return None;
};
let ev = AnomalyEvent::AuthBurst {
user: user.to_string(),
client_ip: client_ip.to_string(),
failures: count,
window_secs: self.config.auth_window_secs as u32,
severity,
detected_at: iso_timestamp.to_string(),
};
drop(windows);
self.push_event(ev.clone());
Some(ev)
}
pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
let evs = self.events.read();
let n = limit.min(evs.len());
let mut out = Vec::with_capacity(n);
for ev in evs.iter().rev().take(n) {
out.push(ev.clone());
}
out
}
pub fn event_count(&self) -> usize {
self.events.read().len()
}
fn push_event(&self, ev: AnomalyEvent) {
let mut evs = self.events.write();
if evs.len() >= self.config.event_buffer_size {
evs.pop_front();
}
evs.push_back(ev);
}
}
#[derive(Debug, Clone)]
pub struct QueryObservation {
pub tenant: String,
pub fingerprint: String,
pub sql: String,
pub timestamp: Instant,
pub iso_timestamp: String,
}
struct AuthBurstWindow {
window: Duration,
failures: VecDeque<Instant>,
}
impl AuthBurstWindow {
fn new(window_secs: u64) -> Self {
Self {
window: Duration::from_secs(window_secs),
failures: VecDeque::new(),
}
}
fn record_failure(&mut self, now: Instant) -> u32 {
while let Some(&front) = self.failures.front() {
if now.duration_since(front) > self.window {
self.failures.pop_front();
} else {
break;
}
}
self.failures.push_back(now);
self.failures.len() as u32
}
}
fn excerpt(s: &str, max: usize) -> String {
if s.len() <= max {
s.to_string()
} else {
format!("{}…", &s[..max])
}
}
#[cfg(test)]
mod tests {
use super::*;
fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
QueryObservation {
tenant: tenant.into(),
fingerprint: fp.into(),
sql: sql.into(),
timestamp: Instant::now(),
iso_timestamp: "2026-04-25T13:30:00Z".into(),
}
}
#[test]
fn novel_query_fires_once_per_fingerprint() {
let d = AnomalyDetector::new(AnomalyConfig::default());
let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
assert!(evs
.iter()
.any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
assert!(!evs2
.iter()
.any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
}
#[test]
fn novel_query_can_be_suppressed_via_config() {
let mut cfg = AnomalyConfig::default();
cfg.emit_novel_queries = false;
let d = AnomalyDetector::new(cfg);
let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
assert!(!evs
.iter()
.any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
}
#[test]
fn sql_injection_detector_flags_classic_or_payload() {
let d = AnomalyDetector::new(AnomalyConfig::default());
let evs = d.record_query(&obs(
"acme",
"fp-inj",
"SELECT * FROM users WHERE id = 1 OR 1=1 --",
));
let sqli = evs
.iter()
.find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
}
#[test]
fn auth_burst_warning_below_critical_threshold() {
let d = AnomalyDetector::new(AnomalyConfig::default());
let now = Instant::now();
let mut last = None;
for _ in 0..6 {
last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
}
match last {
Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
assert_eq!(failures, 6);
assert_eq!(severity, Severity::Warning);
}
other => panic!("expected AuthBurst Warning, got {:?}", other),
}
}
#[test]
fn auth_burst_critical_at_high_threshold() {
let d = AnomalyDetector::new(AnomalyConfig::default());
let now = Instant::now();
let mut last = None;
for _ in 0..12 {
last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
}
match last {
Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
assert_eq!(failures, 12);
assert_eq!(severity, Severity::Critical);
}
other => panic!("expected AuthBurst Critical, got {:?}", other),
}
}
#[test]
fn auth_success_resets_burst_window() {
let d = AnomalyDetector::new(AnomalyConfig::default());
let now = Instant::now();
for _ in 0..6 {
let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
}
let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
assert!(r.is_none());
}
#[test]
fn recent_events_returns_newest_first() {
let d = AnomalyDetector::new(AnomalyConfig::default());
let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
let recent = d.recent_events(10);
match &recent[0] {
AnomalyEvent::NovelQuery { fingerprint, .. } => {
assert_eq!(fingerprint, "fp3")
}
other => panic!("expected NovelQuery fp3, got {:?}", other),
}
}
#[test]
fn recent_events_respects_limit() {
let d = AnomalyDetector::new(AnomalyConfig::default());
for i in 0..50 {
let fp = format!("fp{}", i);
let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
}
assert_eq!(d.recent_events(10).len(), 10);
assert_eq!(d.recent_events(100).len(), 50);
}
#[test]
fn event_buffer_evicts_oldest_when_full() {
let mut cfg = AnomalyConfig::default();
cfg.event_buffer_size = 5;
let d = AnomalyDetector::new(cfg);
for i in 0..20 {
let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
}
assert_eq!(d.event_count(), 5);
}
}