Skip to main content

heliosdb_proxy/anomaly/
mod.rs

1//! Anomaly detection (T3.1).
2//!
3//! Statistical + heuristic detector for production-shape security
4//! and operational anomalies. In-process sliding windows; no
5//! external data store. Four detector families:
6//!
7//! 1. **Rate spike** — z-score on per-tenant queries-per-second
8//!    against a rolling EWMA baseline.
9//! 2. **Credential stuffing** — failed-auth burst per (user, ip)
10//!    inside a sliding 60s window.
11//! 3. **SQL injection** — heuristic pattern match against well-known
12//!    payload shapes (UNION-based, comment escapes, stacked queries,
13//!    boolean blind, time-based blind).
14//! 4. **Novel query** — query fingerprint never seen before, useful
15//!    on high-churn application workloads only as an informational
16//!    signal (low confidence by default; admins can tighten via
17//!    config).
18//!
19//! Why not a trained classifier today?
20//!
21//! Production anomaly classifiers want labels — feedback loops from
22//! analyst-marked false positives. Without that loop in place, a
23//! trained model overfits to whatever traffic was present at
24//! training time. Statistical detectors are honest about their
25//! priors (the EWMA + z-score) and degrade gracefully. The
26//! [`AnomalyEvent`] trail makes it possible to bolt a learned
27//! classifier on later: events become labeled training data.
28
29use std::collections::{HashMap, VecDeque};
30use std::sync::Arc;
31use std::time::{Duration, Instant};
32
33use parking_lot::RwLock;
34use serde::{Deserialize, Serialize};
35
36pub mod ewma;
37pub mod sql_injection;
38
39pub use ewma::{Ewma, RateWindow};
40
41/// Anomaly severity — surfaces in admin output and lets operators
42/// filter detections at scale.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
44#[serde(rename_all = "lowercase")]
45pub enum Severity {
46    Info,
47    Warning,
48    Critical,
49}
50
51/// One anomaly detection event.
52#[derive(Debug, Clone, Serialize)]
53#[serde(tag = "kind", rename_all = "snake_case")]
54pub enum AnomalyEvent {
55    /// Per-tenant request-rate spike against the rolling baseline.
56    RateSpike {
57        tenant: String,
58        rate_per_sec: f64,
59        baseline: f64,
60        z_score: f64,
61        severity: Severity,
62        detected_at: String,
63    },
64    /// Failed-auth burst from a single (user, ip) pair.
65    AuthBurst {
66        user: String,
67        client_ip: String,
68        failures: u32,
69        window_secs: u32,
70        severity: Severity,
71        detected_at: String,
72    },
73    /// SQL-injection-shaped statement matched one or more
74    /// well-known payload patterns.
75    SqlInjection {
76        sql_excerpt: String,
77        patterns_matched: Vec<String>,
78        severity: Severity,
79        detected_at: String,
80    },
81    /// First-seen query fingerprint. Informational by default.
82    NovelQuery {
83        fingerprint: String,
84        sql_excerpt: String,
85        detected_at: String,
86    },
87}
88
89impl AnomalyEvent {
90    pub fn severity(&self) -> Severity {
91        match self {
92            AnomalyEvent::RateSpike { severity, .. } => *severity,
93            AnomalyEvent::AuthBurst { severity, .. } => *severity,
94            AnomalyEvent::SqlInjection { severity, .. } => *severity,
95            AnomalyEvent::NovelQuery { .. } => Severity::Info,
96        }
97    }
98}
99
100/// Tunables. Defaults match production-friendly behaviour: spike
101/// threshold above 3σ, credential burst above 10 failures / 60s.
102#[derive(Debug, Clone)]
103pub struct AnomalyConfig {
104    /// Rolling window for the per-tenant EWMA, in seconds.
105    pub rate_window_secs: u64,
106    /// Minimum z-score before a rate spike fires.
107    pub spike_z_threshold: f64,
108    /// Window for failed-auth bursts, in seconds.
109    pub auth_window_secs: u64,
110    /// Failures inside the auth window that trigger Critical.
111    pub auth_critical_count: u32,
112    /// Failures inside the auth window that trigger Warning.
113    pub auth_warning_count: u32,
114    /// Maximum events kept in the in-memory ring buffer.
115    pub event_buffer_size: usize,
116    /// Treat novel queries as informational events. Set false to
117    /// suppress on high-churn workloads (e.g. ad-hoc analytics).
118    pub emit_novel_queries: bool,
119}
120
121impl Default for AnomalyConfig {
122    fn default() -> Self {
123        Self {
124            rate_window_secs: 60,
125            spike_z_threshold: 3.0,
126            auth_window_secs: 60,
127            auth_critical_count: 10,
128            auth_warning_count: 5,
129            event_buffer_size: 1024,
130            emit_novel_queries: true,
131        }
132    }
133}
134
135/// Top-level detector. Cheap to clone via Arc — the inner state is
136/// guarded by parking_lot RwLocks scoped per-detector to avoid
137/// cross-detector contention.
138#[derive(Clone)]
139pub struct AnomalyDetector {
140    config: Arc<AnomalyConfig>,
141    rate_windows: Arc<RwLock<HashMap<String, RateWindow>>>,
142    auth_windows: Arc<RwLock<HashMap<(String, String), AuthBurstWindow>>>,
143    seen_fingerprints: Arc<RwLock<HashMap<String, ()>>>,
144    events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
145}
146
147impl AnomalyDetector {
148    pub fn new(config: AnomalyConfig) -> Self {
149        Self {
150            config: Arc::new(config),
151            rate_windows: Arc::new(RwLock::new(HashMap::new())),
152            auth_windows: Arc::new(RwLock::new(HashMap::new())),
153            seen_fingerprints: Arc::new(RwLock::new(HashMap::new())),
154            events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
155        }
156    }
157
158    /// Record a query event. Detectors may emit zero or more events.
159    /// Returns the events emitted by THIS call (the caller can also
160    /// poll `recent_events` for the full ring buffer).
161    pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
162        let mut emitted = Vec::new();
163
164        // Rate-spike detector.
165        let mut rates = self.rate_windows.write();
166        let window = rates
167            .entry(ctx.tenant.clone())
168            .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
169        if let Some(spike) = window.observe_and_score(ctx.timestamp) {
170            if spike.z_score >= self.config.spike_z_threshold {
171                let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
172                    Severity::Critical
173                } else {
174                    Severity::Warning
175                };
176                let ev = AnomalyEvent::RateSpike {
177                    tenant: ctx.tenant.clone(),
178                    rate_per_sec: spike.rate,
179                    baseline: spike.baseline,
180                    z_score: spike.z_score,
181                    severity,
182                    detected_at: ctx.iso_timestamp.clone(),
183                };
184                emitted.push(ev.clone());
185                self.push_event(ev);
186            }
187        }
188        drop(rates);
189
190        // Novel-query detector.
191        if self.config.emit_novel_queries {
192            let mut seen = self.seen_fingerprints.write();
193            if !seen.contains_key(&ctx.fingerprint) {
194                seen.insert(ctx.fingerprint.clone(), ());
195                let ev = AnomalyEvent::NovelQuery {
196                    fingerprint: ctx.fingerprint.clone(),
197                    sql_excerpt: excerpt(&ctx.sql, 120),
198                    detected_at: ctx.iso_timestamp.clone(),
199                };
200                emitted.push(ev.clone());
201                self.push_event(ev);
202            }
203        }
204
205        // SQL-injection detector. Pure heuristic — runs even if the
206        // upstream pre-query already passed; multiple layers is the
207        // point.
208        let matches = sql_injection::scan(&ctx.sql);
209        if !matches.is_empty() {
210            let severity = if matches.len() >= 2 {
211                Severity::Critical
212            } else {
213                Severity::Warning
214            };
215            let ev = AnomalyEvent::SqlInjection {
216                sql_excerpt: excerpt(&ctx.sql, 200),
217                patterns_matched: matches,
218                severity,
219                detected_at: ctx.iso_timestamp.clone(),
220            };
221            emitted.push(ev.clone());
222            self.push_event(ev);
223        }
224
225        emitted
226    }
227
228    /// Record an authentication outcome. Failed auths feed the
229    /// credential-stuffing detector.
230    pub fn record_auth(
231        &self,
232        user: &str,
233        client_ip: &str,
234        succeeded: bool,
235        timestamp: Instant,
236        iso_timestamp: &str,
237    ) -> Option<AnomalyEvent> {
238        if succeeded {
239            // Successful auth resets the burst counter — common
240            // case after the operator unlocks an account.
241            self.auth_windows
242                .write()
243                .remove(&(user.to_string(), client_ip.to_string()));
244            return None;
245        }
246        let mut windows = self.auth_windows.write();
247        let window = windows
248            .entry((user.to_string(), client_ip.to_string()))
249            .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
250        let count = window.record_failure(timestamp);
251        let severity = if count >= self.config.auth_critical_count {
252            Severity::Critical
253        } else if count >= self.config.auth_warning_count {
254            Severity::Warning
255        } else {
256            return None;
257        };
258        let ev = AnomalyEvent::AuthBurst {
259            user: user.to_string(),
260            client_ip: client_ip.to_string(),
261            failures: count,
262            window_secs: self.config.auth_window_secs as u32,
263            severity,
264            detected_at: iso_timestamp.to_string(),
265        };
266        drop(windows);
267        self.push_event(ev.clone());
268        Some(ev)
269    }
270
271    /// Snapshot of the most recent events. Newest first.
272    pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
273        let evs = self.events.read();
274        let n = limit.min(evs.len());
275        let mut out = Vec::with_capacity(n);
276        for ev in evs.iter().rev().take(n) {
277            out.push(ev.clone());
278        }
279        out
280    }
281
282    /// Total events ever recorded (since process start). Useful for
283    /// metrics export.
284    pub fn event_count(&self) -> usize {
285        self.events.read().len()
286    }
287
288    fn push_event(&self, ev: AnomalyEvent) {
289        let mut evs = self.events.write();
290        if evs.len() >= self.config.event_buffer_size {
291            evs.pop_front();
292        }
293        evs.push_back(ev);
294    }
295}
296
297/// Per-query observation passed to the detector. Built by the proxy
298/// at hook time; populated as much as the proxy knows about the
299/// query.
300#[derive(Debug, Clone)]
301pub struct QueryObservation {
302    /// Tenant identifier (or "default" / "" when no multi-tenancy).
303    pub tenant: String,
304    /// Canonical query fingerprint (literals normalised). Same shape
305    /// the analytics module produces.
306    pub fingerprint: String,
307    /// Raw SQL — used for SQL-injection scanning + UI excerpt.
308    pub sql: String,
309    /// Wall-clock timestamp the query arrived. Detectors compute
310    /// rates against this.
311    pub timestamp: Instant,
312    /// Pre-formatted RFC 3339 timestamp the proxy already has;
313    /// detectors copy it into events rather than re-format.
314    pub iso_timestamp: String,
315}
316
317/// Sliding 60s window of failed auths. Auto-evicts entries older
318/// than `window_secs`.
319struct AuthBurstWindow {
320    window: Duration,
321    failures: VecDeque<Instant>,
322}
323
324impl AuthBurstWindow {
325    fn new(window_secs: u64) -> Self {
326        Self {
327            window: Duration::from_secs(window_secs),
328            failures: VecDeque::new(),
329        }
330    }
331
332    fn record_failure(&mut self, now: Instant) -> u32 {
333        // Evict entries older than the window.
334        while let Some(&front) = self.failures.front() {
335            if now.duration_since(front) > self.window {
336                self.failures.pop_front();
337            } else {
338                break;
339            }
340        }
341        self.failures.push_back(now);
342        self.failures.len() as u32
343    }
344}
345
346fn excerpt(s: &str, max: usize) -> String {
347    if s.len() <= max {
348        s.to_string()
349    } else {
350        format!("{}…", &s[..max])
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use super::*;
357
358    fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
359        QueryObservation {
360            tenant: tenant.into(),
361            fingerprint: fp.into(),
362            sql: sql.into(),
363            timestamp: Instant::now(),
364            iso_timestamp: "2026-04-25T13:30:00Z".into(),
365        }
366    }
367
368    #[test]
369    fn novel_query_fires_once_per_fingerprint() {
370        let d = AnomalyDetector::new(AnomalyConfig::default());
371        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
372        assert!(evs
373            .iter()
374            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
375        let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
376        assert!(!evs2
377            .iter()
378            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
379    }
380
381    #[test]
382    fn novel_query_can_be_suppressed_via_config() {
383        let mut cfg = AnomalyConfig::default();
384        cfg.emit_novel_queries = false;
385        let d = AnomalyDetector::new(cfg);
386        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
387        assert!(!evs
388            .iter()
389            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
390    }
391
392    #[test]
393    fn sql_injection_detector_flags_classic_or_payload() {
394        let d = AnomalyDetector::new(AnomalyConfig::default());
395        let evs = d.record_query(&obs(
396            "acme",
397            "fp-inj",
398            "SELECT * FROM users WHERE id = 1 OR 1=1 --",
399        ));
400        let sqli = evs
401            .iter()
402            .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
403        assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
404    }
405
406    #[test]
407    fn auth_burst_warning_below_critical_threshold() {
408        let d = AnomalyDetector::new(AnomalyConfig::default());
409        let now = Instant::now();
410        let mut last = None;
411        for _ in 0..6 {
412            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
413        }
414        match last {
415            Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
416                assert_eq!(failures, 6);
417                assert_eq!(severity, Severity::Warning);
418            }
419            other => panic!("expected AuthBurst Warning, got {:?}", other),
420        }
421    }
422
423    #[test]
424    fn auth_burst_critical_at_high_threshold() {
425        let d = AnomalyDetector::new(AnomalyConfig::default());
426        let now = Instant::now();
427        let mut last = None;
428        for _ in 0..12 {
429            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
430        }
431        match last {
432            Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
433                assert_eq!(failures, 12);
434                assert_eq!(severity, Severity::Critical);
435            }
436            other => panic!("expected AuthBurst Critical, got {:?}", other),
437        }
438    }
439
440    #[test]
441    fn auth_success_resets_burst_window() {
442        let d = AnomalyDetector::new(AnomalyConfig::default());
443        let now = Instant::now();
444        for _ in 0..6 {
445            let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
446        }
447        // Successful auth clears the window — next failure starts at 1.
448        let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
449        let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
450        // 1 failure is below the warning threshold (5) — None.
451        assert!(r.is_none());
452    }
453
454    #[test]
455    fn recent_events_returns_newest_first() {
456        let d = AnomalyDetector::new(AnomalyConfig::default());
457        let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
458        let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
459        let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
460        let recent = d.recent_events(10);
461        // First event in `recent` is the newest novel-query (fp3).
462        match &recent[0] {
463            AnomalyEvent::NovelQuery { fingerprint, .. } => {
464                assert_eq!(fingerprint, "fp3")
465            }
466            other => panic!("expected NovelQuery fp3, got {:?}", other),
467        }
468    }
469
470    #[test]
471    fn recent_events_respects_limit() {
472        let d = AnomalyDetector::new(AnomalyConfig::default());
473        for i in 0..50 {
474            let fp = format!("fp{}", i);
475            let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
476        }
477        assert_eq!(d.recent_events(10).len(), 10);
478        assert_eq!(d.recent_events(100).len(), 50);
479    }
480
481    #[test]
482    fn event_buffer_evicts_oldest_when_full() {
483        let mut cfg = AnomalyConfig::default();
484        cfg.event_buffer_size = 5;
485        let d = AnomalyDetector::new(cfg);
486        for i in 0..20 {
487            let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
488        }
489        // Buffer holds at most 5; total event_count reflects current
490        // buffer size, not lifetime count (simpler than tracking
491        // separately).
492        assert_eq!(d.event_count(), 5);
493    }
494}