Skip to main content

heliosdb_proxy/anomaly/
mod.rs

1//! Anomaly detection (T3.1).
2//!
3//! Statistical + heuristic detector for production-shape security
4//! and operational anomalies. In-process sliding windows; no
5//! external data store. Four detector families:
6//!
7//! 1. **Rate spike** — z-score on per-tenant queries-per-second
8//!    against a rolling EWMA baseline.
9//! 2. **Credential stuffing** — failed-auth burst per (user, ip)
10//!    inside a sliding 60s window.
11//! 3. **SQL injection** — heuristic pattern match against well-known
12//!    payload shapes (UNION-based, comment escapes, stacked queries,
13//!    boolean blind, time-based blind).
14//! 4. **Novel query** — query fingerprint never seen before, useful
15//!    on high-churn application workloads only as an informational
16//!    signal (low confidence by default; admins can tighten via
17//!    config).
18//!
19//! Why not a trained classifier today?
20//!
21//! Production anomaly classifiers want labels — feedback loops from
22//! analyst-marked false positives. Without that loop in place, a
23//! trained model overfits to whatever traffic was present at
24//! training time. Statistical detectors are honest about their
25//! priors (the EWMA + z-score) and degrade gracefully. The
26//! [`AnomalyEvent`] trail makes it possible to bolt a learned
27//! classifier on later: events become labeled training data.
28
29use std::collections::VecDeque;
30
31use dashmap::DashMap;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36use serde::{Deserialize, Serialize};
37
38pub mod ewma;
39pub mod sql_injection;
40
41pub use ewma::{Ewma, RateWindow};
42
43/// Anomaly severity — surfaces in admin output and lets operators
44/// filter detections at scale.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "lowercase")]
47pub enum Severity {
48    Info,
49    Warning,
50    Critical,
51}
52
53/// One anomaly detection event.
54#[derive(Debug, Clone, Serialize)]
55#[serde(tag = "kind", rename_all = "snake_case")]
56pub enum AnomalyEvent {
57    /// Per-tenant request-rate spike against the rolling baseline.
58    RateSpike {
59        tenant: String,
60        rate_per_sec: f64,
61        baseline: f64,
62        z_score: f64,
63        severity: Severity,
64        detected_at: String,
65    },
66    /// Failed-auth burst from a single (user, ip) pair.
67    AuthBurst {
68        user: String,
69        client_ip: String,
70        failures: u32,
71        window_secs: u32,
72        severity: Severity,
73        detected_at: String,
74    },
75    /// SQL-injection-shaped statement matched one or more
76    /// well-known payload patterns.
77    SqlInjection {
78        sql_excerpt: String,
79        patterns_matched: Vec<String>,
80        severity: Severity,
81        detected_at: String,
82    },
83    /// First-seen query fingerprint. Informational by default.
84    NovelQuery {
85        fingerprint: String,
86        sql_excerpt: String,
87        detected_at: String,
88    },
89}
90
91impl AnomalyEvent {
92    pub fn severity(&self) -> Severity {
93        match self {
94            AnomalyEvent::RateSpike { severity, .. } => *severity,
95            AnomalyEvent::AuthBurst { severity, .. } => *severity,
96            AnomalyEvent::SqlInjection { severity, .. } => *severity,
97            AnomalyEvent::NovelQuery { .. } => Severity::Info,
98        }
99    }
100}
101
102/// Tunables. Defaults match production-friendly behaviour: spike
103/// threshold above 3σ, credential burst above 10 failures / 60s.
104#[derive(Debug, Clone)]
105pub struct AnomalyConfig {
106    /// Rolling window for the per-tenant EWMA, in seconds.
107    pub rate_window_secs: u64,
108    /// Minimum z-score before a rate spike fires.
109    pub spike_z_threshold: f64,
110    /// Window for failed-auth bursts, in seconds.
111    pub auth_window_secs: u64,
112    /// Failures inside the auth window that trigger Critical.
113    pub auth_critical_count: u32,
114    /// Failures inside the auth window that trigger Warning.
115    pub auth_warning_count: u32,
116    /// Maximum events kept in the in-memory ring buffer.
117    pub event_buffer_size: usize,
118    /// Treat novel queries as informational events. Set false to
119    /// suppress on high-churn workloads (e.g. ad-hoc analytics).
120    pub emit_novel_queries: bool,
121}
122
123impl Default for AnomalyConfig {
124    fn default() -> Self {
125        Self {
126            rate_window_secs: 60,
127            spike_z_threshold: 3.0,
128            auth_window_secs: 60,
129            auth_critical_count: 10,
130            auth_warning_count: 5,
131            event_buffer_size: 1024,
132            emit_novel_queries: true,
133        }
134    }
135}
136
137/// Top-level detector. Cheap to clone via Arc — the inner state is
138/// guarded by parking_lot RwLocks scoped per-detector to avoid
139/// cross-detector contention.
140#[derive(Clone)]
141pub struct AnomalyDetector {
142    config: Arc<AnomalyConfig>,
143    // Per-key sliding windows live in DashMaps so concurrent sessions for
144    // different tenants/users hit different shards instead of serializing
145    // on one global writer per query.
146    rate_windows: Arc<DashMap<String, RateWindow>>,
147    auth_windows: Arc<DashMap<(String, String), AuthBurstWindow>>,
148    seen_fingerprints: Arc<DashMap<String, ()>>,
149    events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
150}
151
152/// Upper bound on the novel-query fingerprint set. Without a cap the set
153/// grows unbounded on high-cardinality SQL (a slow memory leak); the
154/// detector is informational, so clearing on overflow is acceptable.
155const MAX_SEEN_FINGERPRINTS: usize = 100_000;
156
157impl AnomalyDetector {
158    pub fn new(config: AnomalyConfig) -> Self {
159        Self {
160            config: Arc::new(config),
161            rate_windows: Arc::new(DashMap::new()),
162            auth_windows: Arc::new(DashMap::new()),
163            seen_fingerprints: Arc::new(DashMap::new()),
164            events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
165        }
166    }
167
168    /// Record a query event. Detectors may emit zero or more events.
169    /// Returns the events emitted by THIS call (the caller can also
170    /// poll `recent_events` for the full ring buffer).
171    pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
172        let mut emitted = Vec::new();
173
174        // Rate-spike detector. Per-tenant shard lock only.
175        {
176            let mut window = self
177                .rate_windows
178                .entry(ctx.tenant.clone())
179                .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
180            if let Some(spike) = window.observe_and_score(ctx.timestamp) {
181                if spike.z_score >= self.config.spike_z_threshold {
182                    let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
183                        Severity::Critical
184                    } else {
185                        Severity::Warning
186                    };
187                    let ev = AnomalyEvent::RateSpike {
188                        tenant: ctx.tenant.clone(),
189                        rate_per_sec: spike.rate,
190                        baseline: spike.baseline,
191                        z_score: spike.z_score,
192                        severity,
193                        detected_at: chrono::Utc::now().to_rfc3339(),
194                    };
195                    drop(window); // release shard before pushing to the event ring
196                    emitted.push(ev.clone());
197                    self.push_event(ev);
198                }
199            }
200        }
201
202        // Novel-query detector. The common already-seen path takes only a
203        // shard read; a fresh fingerprint upgrades to a shard write.
204        if self.config.emit_novel_queries && !self.seen_fingerprints.contains_key(&ctx.fingerprint)
205        {
206            // Bound the set so unique-SQL traffic can't leak memory.
207            if self.seen_fingerprints.len() >= MAX_SEEN_FINGERPRINTS {
208                self.seen_fingerprints.clear();
209            }
210            // insert returns the prior value; None means we won the race to
211            // first-see this fingerprint, so emit exactly once.
212            if self
213                .seen_fingerprints
214                .insert(ctx.fingerprint.clone(), ())
215                .is_none()
216            {
217                let ev = AnomalyEvent::NovelQuery {
218                    fingerprint: ctx.fingerprint.clone(),
219                    sql_excerpt: excerpt(&ctx.sql, 120),
220                    detected_at: chrono::Utc::now().to_rfc3339(),
221                };
222                emitted.push(ev.clone());
223                self.push_event(ev);
224            }
225        }
226
227        // SQL-injection detector. Pure heuristic — runs even if the
228        // upstream pre-query already passed; multiple layers is the
229        // point.
230        let matches = sql_injection::scan(&ctx.sql);
231        if !matches.is_empty() {
232            let severity = if matches.len() >= 2 {
233                Severity::Critical
234            } else {
235                Severity::Warning
236            };
237            let ev = AnomalyEvent::SqlInjection {
238                sql_excerpt: excerpt(&ctx.sql, 200),
239                patterns_matched: matches,
240                severity,
241                detected_at: chrono::Utc::now().to_rfc3339(),
242            };
243            emitted.push(ev.clone());
244            self.push_event(ev);
245        }
246
247        emitted
248    }
249
250    /// Record an authentication outcome. Failed auths feed the
251    /// credential-stuffing detector.
252    pub fn record_auth(
253        &self,
254        user: &str,
255        client_ip: &str,
256        succeeded: bool,
257        timestamp: Instant,
258        iso_timestamp: &str,
259    ) -> Option<AnomalyEvent> {
260        if succeeded {
261            // Successful auth resets the burst counter — common
262            // case after the operator unlocks an account.
263            self.auth_windows
264                .remove(&(user.to_string(), client_ip.to_string()));
265            return None;
266        }
267        let count = {
268            let mut window = self
269                .auth_windows
270                .entry((user.to_string(), client_ip.to_string()))
271                .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
272            window.record_failure(timestamp)
273        };
274        let severity = if count >= self.config.auth_critical_count {
275            Severity::Critical
276        } else if count >= self.config.auth_warning_count {
277            Severity::Warning
278        } else {
279            return None;
280        };
281        let ev = AnomalyEvent::AuthBurst {
282            user: user.to_string(),
283            client_ip: client_ip.to_string(),
284            failures: count,
285            window_secs: self.config.auth_window_secs as u32,
286            severity,
287            detected_at: iso_timestamp.to_string(),
288        };
289        self.push_event(ev.clone());
290        Some(ev)
291    }
292
293    /// Snapshot of the most recent events. Newest first.
294    pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
295        let evs = self.events.read();
296        let n = limit.min(evs.len());
297        let mut out = Vec::with_capacity(n);
298        for ev in evs.iter().rev().take(n) {
299            out.push(ev.clone());
300        }
301        out
302    }
303
304    /// Total events ever recorded (since process start). Useful for
305    /// metrics export.
306    pub fn event_count(&self) -> usize {
307        self.events.read().len()
308    }
309
310    fn push_event(&self, ev: AnomalyEvent) {
311        let mut evs = self.events.write();
312        if evs.len() >= self.config.event_buffer_size {
313            evs.pop_front();
314        }
315        evs.push_back(ev);
316    }
317}
318
319/// Per-query observation passed to the detector. Built by the proxy
320/// at hook time; populated as much as the proxy knows about the
321/// query.
322#[derive(Debug, Clone)]
323pub struct QueryObservation {
324    /// Tenant identifier (or "default" / "" when no multi-tenancy).
325    pub tenant: String,
326    /// Canonical query fingerprint (literals normalised). Same shape
327    /// the analytics module produces.
328    pub fingerprint: String,
329    /// Raw SQL — used for SQL-injection scanning + UI excerpt.
330    pub sql: String,
331    /// Wall-clock timestamp the query arrived. Detectors compute
332    /// rates against this.
333    pub timestamp: Instant,
334}
335
336/// Sliding 60s window of failed auths. Auto-evicts entries older
337/// than `window_secs`.
338struct AuthBurstWindow {
339    window: Duration,
340    failures: VecDeque<Instant>,
341}
342
343impl AuthBurstWindow {
344    fn new(window_secs: u64) -> Self {
345        Self {
346            window: Duration::from_secs(window_secs),
347            failures: VecDeque::new(),
348        }
349    }
350
351    fn record_failure(&mut self, now: Instant) -> u32 {
352        // Evict entries older than the window.
353        while let Some(&front) = self.failures.front() {
354            if now.duration_since(front) > self.window {
355                self.failures.pop_front();
356            } else {
357                break;
358            }
359        }
360        self.failures.push_back(now);
361        self.failures.len() as u32
362    }
363}
364
365fn excerpt(s: &str, max: usize) -> String {
366    if s.len() <= max {
367        s.to_string()
368    } else {
369        format!("{}…", &s[..max])
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
378        QueryObservation {
379            tenant: tenant.into(),
380            fingerprint: fp.into(),
381            sql: sql.into(),
382            timestamp: Instant::now(),
383        }
384    }
385
386    #[test]
387    fn novel_query_fires_once_per_fingerprint() {
388        let d = AnomalyDetector::new(AnomalyConfig::default());
389        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
390        assert!(evs
391            .iter()
392            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
393        let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
394        assert!(!evs2
395            .iter()
396            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
397    }
398
399    #[test]
400    fn novel_query_can_be_suppressed_via_config() {
401        let mut cfg = AnomalyConfig::default();
402        cfg.emit_novel_queries = false;
403        let d = AnomalyDetector::new(cfg);
404        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
405        assert!(!evs
406            .iter()
407            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
408    }
409
410    #[test]
411    fn sql_injection_detector_flags_classic_or_payload() {
412        let d = AnomalyDetector::new(AnomalyConfig::default());
413        let evs = d.record_query(&obs(
414            "acme",
415            "fp-inj",
416            "SELECT * FROM users WHERE id = 1 OR 1=1 --",
417        ));
418        let sqli = evs
419            .iter()
420            .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
421        assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
422    }
423
424    #[test]
425    fn auth_burst_warning_below_critical_threshold() {
426        let d = AnomalyDetector::new(AnomalyConfig::default());
427        let now = Instant::now();
428        let mut last = None;
429        for _ in 0..6 {
430            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
431        }
432        match last {
433            Some(AnomalyEvent::AuthBurst {
434                failures, severity, ..
435            }) => {
436                assert_eq!(failures, 6);
437                assert_eq!(severity, Severity::Warning);
438            }
439            other => panic!("expected AuthBurst Warning, got {:?}", other),
440        }
441    }
442
443    #[test]
444    fn auth_burst_critical_at_high_threshold() {
445        let d = AnomalyDetector::new(AnomalyConfig::default());
446        let now = Instant::now();
447        let mut last = None;
448        for _ in 0..12 {
449            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
450        }
451        match last {
452            Some(AnomalyEvent::AuthBurst {
453                failures, severity, ..
454            }) => {
455                assert_eq!(failures, 12);
456                assert_eq!(severity, Severity::Critical);
457            }
458            other => panic!("expected AuthBurst Critical, got {:?}", other),
459        }
460    }
461
462    #[test]
463    fn auth_success_resets_burst_window() {
464        let d = AnomalyDetector::new(AnomalyConfig::default());
465        let now = Instant::now();
466        for _ in 0..6 {
467            let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
468        }
469        // Successful auth clears the window — next failure starts at 1.
470        let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
471        let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
472        // 1 failure is below the warning threshold (5) — None.
473        assert!(r.is_none());
474    }
475
476    #[test]
477    fn recent_events_returns_newest_first() {
478        let d = AnomalyDetector::new(AnomalyConfig::default());
479        let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
480        let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
481        let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
482        let recent = d.recent_events(10);
483        // First event in `recent` is the newest novel-query (fp3).
484        match &recent[0] {
485            AnomalyEvent::NovelQuery { fingerprint, .. } => {
486                assert_eq!(fingerprint, "fp3")
487            }
488            other => panic!("expected NovelQuery fp3, got {:?}", other),
489        }
490    }
491
492    #[test]
493    fn recent_events_respects_limit() {
494        let d = AnomalyDetector::new(AnomalyConfig::default());
495        for i in 0..50 {
496            let fp = format!("fp{}", i);
497            let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
498        }
499        assert_eq!(d.recent_events(10).len(), 10);
500        assert_eq!(d.recent_events(100).len(), 50);
501    }
502
503    #[test]
504    fn event_buffer_evicts_oldest_when_full() {
505        let mut cfg = AnomalyConfig::default();
506        cfg.event_buffer_size = 5;
507        let d = AnomalyDetector::new(cfg);
508        for i in 0..20 {
509            let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
510        }
511        // Buffer holds at most 5; total event_count reflects current
512        // buffer size, not lifetime count (simpler than tracking
513        // separately).
514        assert_eq!(d.event_count(), 5);
515    }
516}