Skip to main content

heliosdb_proxy/anomaly/
mod.rs

1//! Anomaly detection (T3.1).
2//!
3//! Statistical + heuristic detector for production-shape security
4//! and operational anomalies. In-process sliding windows; no
5//! external data store. Four detector families:
6//!
7//! 1. **Rate spike** — z-score on per-tenant queries-per-second
8//!    against a rolling EWMA baseline.
9//! 2. **Credential stuffing** — failed-auth burst per (user, ip)
10//!    inside a sliding 60s window.
11//! 3. **SQL injection** — heuristic pattern match against well-known
12//!    payload shapes (UNION-based, comment escapes, stacked queries,
13//!    boolean blind, time-based blind).
14//! 4. **Novel query** — query fingerprint never seen before, useful
15//!    on high-churn application workloads only as an informational
16//!    signal (low confidence by default; admins can tighten via
17//!    config).
18//!
19//! Why not a trained classifier today?
20//!
21//! Production anomaly classifiers want labels — feedback loops from
22//! analyst-marked false positives. Without that loop in place, a
23//! trained model overfits to whatever traffic was present at
24//! training time. Statistical detectors are honest about their
25//! priors (the EWMA + z-score) and degrade gracefully. The
26//! [`AnomalyEvent`] trail makes it possible to bolt a learned
27//! classifier on later: events become labeled training data.
28
29use std::collections::VecDeque;
30
31use dashmap::DashMap;
32use std::sync::Arc;
33use std::time::{Duration, Instant};
34
35use parking_lot::RwLock;
36use serde::{Deserialize, Serialize};
37
38pub mod ewma;
39pub mod sql_injection;
40
41pub use ewma::{Ewma, RateWindow};
42
43/// Anomaly severity — surfaces in admin output and lets operators
44/// filter detections at scale.
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
46#[serde(rename_all = "lowercase")]
47pub enum Severity {
48    Info,
49    Warning,
50    Critical,
51}
52
53/// One anomaly detection event.
54#[derive(Debug, Clone, Serialize)]
55#[serde(tag = "kind", rename_all = "snake_case")]
56pub enum AnomalyEvent {
57    /// Per-tenant request-rate spike against the rolling baseline.
58    RateSpike {
59        tenant: String,
60        rate_per_sec: f64,
61        baseline: f64,
62        z_score: f64,
63        severity: Severity,
64        detected_at: String,
65    },
66    /// Failed-auth burst from a single (user, ip) pair.
67    AuthBurst {
68        user: String,
69        client_ip: String,
70        failures: u32,
71        window_secs: u32,
72        severity: Severity,
73        detected_at: String,
74    },
75    /// SQL-injection-shaped statement matched one or more
76    /// well-known payload patterns.
77    SqlInjection {
78        sql_excerpt: String,
79        patterns_matched: Vec<String>,
80        severity: Severity,
81        detected_at: String,
82    },
83    /// First-seen query fingerprint. Informational by default.
84    NovelQuery {
85        fingerprint: String,
86        sql_excerpt: String,
87        detected_at: String,
88    },
89}
90
91impl AnomalyEvent {
92    pub fn severity(&self) -> Severity {
93        match self {
94            AnomalyEvent::RateSpike { severity, .. } => *severity,
95            AnomalyEvent::AuthBurst { severity, .. } => *severity,
96            AnomalyEvent::SqlInjection { severity, .. } => *severity,
97            AnomalyEvent::NovelQuery { .. } => Severity::Info,
98        }
99    }
100}
101
102/// Tunables. Defaults match production-friendly behaviour: spike
103/// threshold above 3σ, credential burst above 10 failures / 60s.
104#[derive(Debug, Clone)]
105pub struct AnomalyConfig {
106    /// Rolling window for the per-tenant EWMA, in seconds.
107    pub rate_window_secs: u64,
108    /// Minimum z-score before a rate spike fires.
109    pub spike_z_threshold: f64,
110    /// Window for failed-auth bursts, in seconds.
111    pub auth_window_secs: u64,
112    /// Failures inside the auth window that trigger Critical.
113    pub auth_critical_count: u32,
114    /// Failures inside the auth window that trigger Warning.
115    pub auth_warning_count: u32,
116    /// Maximum events kept in the in-memory ring buffer.
117    pub event_buffer_size: usize,
118    /// Treat novel queries as informational events. Set false to
119    /// suppress on high-churn workloads (e.g. ad-hoc analytics).
120    pub emit_novel_queries: bool,
121}
122
123impl Default for AnomalyConfig {
124    fn default() -> Self {
125        Self {
126            rate_window_secs: 60,
127            spike_z_threshold: 3.0,
128            auth_window_secs: 60,
129            auth_critical_count: 10,
130            auth_warning_count: 5,
131            event_buffer_size: 1024,
132            emit_novel_queries: true,
133        }
134    }
135}
136
137/// Top-level detector. Cheap to clone via Arc — the inner state is
138/// guarded by parking_lot RwLocks scoped per-detector to avoid
139/// cross-detector contention.
140#[derive(Clone)]
141pub struct AnomalyDetector {
142    config: Arc<AnomalyConfig>,
143    // Per-key sliding windows live in DashMaps so concurrent sessions for
144    // different tenants/users hit different shards instead of serializing
145    // on one global writer per query.
146    rate_windows: Arc<DashMap<String, RateWindow>>,
147    auth_windows: Arc<DashMap<(String, String), AuthBurstWindow>>,
148    seen_fingerprints: Arc<DashMap<String, ()>>,
149    events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
150}
151
152/// Upper bound on the novel-query fingerprint set. Without a cap the set
153/// grows unbounded on high-cardinality SQL (a slow memory leak); the
154/// detector is informational, so clearing on overflow is acceptable.
155const MAX_SEEN_FINGERPRINTS: usize = 100_000;
156
157impl AnomalyDetector {
158    pub fn new(config: AnomalyConfig) -> Self {
159        Self {
160            config: Arc::new(config),
161            rate_windows: Arc::new(DashMap::new()),
162            auth_windows: Arc::new(DashMap::new()),
163            seen_fingerprints: Arc::new(DashMap::new()),
164            events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
165        }
166    }
167
168    /// Record a query event. Detectors may emit zero or more events.
169    /// Returns the events emitted by THIS call (the caller can also
170    /// poll `recent_events` for the full ring buffer).
171    pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
172        let mut emitted = Vec::new();
173
174        // Rate-spike detector. Per-tenant shard lock only.
175        {
176            let mut window = self
177                .rate_windows
178                .entry(ctx.tenant.clone())
179                .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
180            if let Some(spike) = window.observe_and_score(ctx.timestamp) {
181                if spike.z_score >= self.config.spike_z_threshold {
182                    let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
183                        Severity::Critical
184                    } else {
185                        Severity::Warning
186                    };
187                    let ev = AnomalyEvent::RateSpike {
188                        tenant: ctx.tenant.clone(),
189                        rate_per_sec: spike.rate,
190                        baseline: spike.baseline,
191                        z_score: spike.z_score,
192                        severity,
193                        detected_at: chrono::Utc::now().to_rfc3339(),
194                    };
195                    drop(window); // release shard before pushing to the event ring
196                    emitted.push(ev.clone());
197                    self.push_event(ev);
198                }
199            }
200        }
201
202        // Novel-query detector. The common already-seen path takes only a
203        // shard read; a fresh fingerprint upgrades to a shard write.
204        if self.config.emit_novel_queries
205            && !self.seen_fingerprints.contains_key(&ctx.fingerprint)
206        {
207            // Bound the set so unique-SQL traffic can't leak memory.
208            if self.seen_fingerprints.len() >= MAX_SEEN_FINGERPRINTS {
209                self.seen_fingerprints.clear();
210            }
211            // insert returns the prior value; None means we won the race to
212            // first-see this fingerprint, so emit exactly once.
213            if self
214                .seen_fingerprints
215                .insert(ctx.fingerprint.clone(), ())
216                .is_none()
217            {
218                let ev = AnomalyEvent::NovelQuery {
219                    fingerprint: ctx.fingerprint.clone(),
220                    sql_excerpt: excerpt(&ctx.sql, 120),
221                    detected_at: chrono::Utc::now().to_rfc3339(),
222                };
223                emitted.push(ev.clone());
224                self.push_event(ev);
225            }
226        }
227
228        // SQL-injection detector. Pure heuristic — runs even if the
229        // upstream pre-query already passed; multiple layers is the
230        // point.
231        let matches = sql_injection::scan(&ctx.sql);
232        if !matches.is_empty() {
233            let severity = if matches.len() >= 2 {
234                Severity::Critical
235            } else {
236                Severity::Warning
237            };
238            let ev = AnomalyEvent::SqlInjection {
239                sql_excerpt: excerpt(&ctx.sql, 200),
240                patterns_matched: matches,
241                severity,
242                detected_at: chrono::Utc::now().to_rfc3339(),
243            };
244            emitted.push(ev.clone());
245            self.push_event(ev);
246        }
247
248        emitted
249    }
250
251    /// Record an authentication outcome. Failed auths feed the
252    /// credential-stuffing detector.
253    pub fn record_auth(
254        &self,
255        user: &str,
256        client_ip: &str,
257        succeeded: bool,
258        timestamp: Instant,
259        iso_timestamp: &str,
260    ) -> Option<AnomalyEvent> {
261        if succeeded {
262            // Successful auth resets the burst counter — common
263            // case after the operator unlocks an account.
264            self.auth_windows
265                .remove(&(user.to_string(), client_ip.to_string()));
266            return None;
267        }
268        let count = {
269            let mut window = self
270                .auth_windows
271                .entry((user.to_string(), client_ip.to_string()))
272                .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
273            window.record_failure(timestamp)
274        };
275        let severity = if count >= self.config.auth_critical_count {
276            Severity::Critical
277        } else if count >= self.config.auth_warning_count {
278            Severity::Warning
279        } else {
280            return None;
281        };
282        let ev = AnomalyEvent::AuthBurst {
283            user: user.to_string(),
284            client_ip: client_ip.to_string(),
285            failures: count,
286            window_secs: self.config.auth_window_secs as u32,
287            severity,
288            detected_at: iso_timestamp.to_string(),
289        };
290        self.push_event(ev.clone());
291        Some(ev)
292    }
293
294    /// Snapshot of the most recent events. Newest first.
295    pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
296        let evs = self.events.read();
297        let n = limit.min(evs.len());
298        let mut out = Vec::with_capacity(n);
299        for ev in evs.iter().rev().take(n) {
300            out.push(ev.clone());
301        }
302        out
303    }
304
305    /// Total events ever recorded (since process start). Useful for
306    /// metrics export.
307    pub fn event_count(&self) -> usize {
308        self.events.read().len()
309    }
310
311    fn push_event(&self, ev: AnomalyEvent) {
312        let mut evs = self.events.write();
313        if evs.len() >= self.config.event_buffer_size {
314            evs.pop_front();
315        }
316        evs.push_back(ev);
317    }
318}
319
320/// Per-query observation passed to the detector. Built by the proxy
321/// at hook time; populated as much as the proxy knows about the
322/// query.
323#[derive(Debug, Clone)]
324pub struct QueryObservation {
325    /// Tenant identifier (or "default" / "" when no multi-tenancy).
326    pub tenant: String,
327    /// Canonical query fingerprint (literals normalised). Same shape
328    /// the analytics module produces.
329    pub fingerprint: String,
330    /// Raw SQL — used for SQL-injection scanning + UI excerpt.
331    pub sql: String,
332    /// Wall-clock timestamp the query arrived. Detectors compute
333    /// rates against this.
334    pub timestamp: Instant,
335}
336
337/// Sliding 60s window of failed auths. Auto-evicts entries older
338/// than `window_secs`.
339struct AuthBurstWindow {
340    window: Duration,
341    failures: VecDeque<Instant>,
342}
343
344impl AuthBurstWindow {
345    fn new(window_secs: u64) -> Self {
346        Self {
347            window: Duration::from_secs(window_secs),
348            failures: VecDeque::new(),
349        }
350    }
351
352    fn record_failure(&mut self, now: Instant) -> u32 {
353        // Evict entries older than the window.
354        while let Some(&front) = self.failures.front() {
355            if now.duration_since(front) > self.window {
356                self.failures.pop_front();
357            } else {
358                break;
359            }
360        }
361        self.failures.push_back(now);
362        self.failures.len() as u32
363    }
364}
365
366fn excerpt(s: &str, max: usize) -> String {
367    if s.len() <= max {
368        s.to_string()
369    } else {
370        format!("{}…", &s[..max])
371    }
372}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
379        QueryObservation {
380            tenant: tenant.into(),
381            fingerprint: fp.into(),
382            sql: sql.into(),
383            timestamp: Instant::now(),
384        }
385    }
386
387    #[test]
388    fn novel_query_fires_once_per_fingerprint() {
389        let d = AnomalyDetector::new(AnomalyConfig::default());
390        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
391        assert!(evs
392            .iter()
393            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
394        let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
395        assert!(!evs2
396            .iter()
397            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
398    }
399
400    #[test]
401    fn novel_query_can_be_suppressed_via_config() {
402        let mut cfg = AnomalyConfig::default();
403        cfg.emit_novel_queries = false;
404        let d = AnomalyDetector::new(cfg);
405        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
406        assert!(!evs
407            .iter()
408            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
409    }
410
411    #[test]
412    fn sql_injection_detector_flags_classic_or_payload() {
413        let d = AnomalyDetector::new(AnomalyConfig::default());
414        let evs = d.record_query(&obs(
415            "acme",
416            "fp-inj",
417            "SELECT * FROM users WHERE id = 1 OR 1=1 --",
418        ));
419        let sqli = evs
420            .iter()
421            .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
422        assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
423    }
424
425    #[test]
426    fn auth_burst_warning_below_critical_threshold() {
427        let d = AnomalyDetector::new(AnomalyConfig::default());
428        let now = Instant::now();
429        let mut last = None;
430        for _ in 0..6 {
431            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
432        }
433        match last {
434            Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
435                assert_eq!(failures, 6);
436                assert_eq!(severity, Severity::Warning);
437            }
438            other => panic!("expected AuthBurst Warning, got {:?}", other),
439        }
440    }
441
442    #[test]
443    fn auth_burst_critical_at_high_threshold() {
444        let d = AnomalyDetector::new(AnomalyConfig::default());
445        let now = Instant::now();
446        let mut last = None;
447        for _ in 0..12 {
448            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
449        }
450        match last {
451            Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
452                assert_eq!(failures, 12);
453                assert_eq!(severity, Severity::Critical);
454            }
455            other => panic!("expected AuthBurst Critical, got {:?}", other),
456        }
457    }
458
459    #[test]
460    fn auth_success_resets_burst_window() {
461        let d = AnomalyDetector::new(AnomalyConfig::default());
462        let now = Instant::now();
463        for _ in 0..6 {
464            let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
465        }
466        // Successful auth clears the window — next failure starts at 1.
467        let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
468        let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
469        // 1 failure is below the warning threshold (5) — None.
470        assert!(r.is_none());
471    }
472
473    #[test]
474    fn recent_events_returns_newest_first() {
475        let d = AnomalyDetector::new(AnomalyConfig::default());
476        let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
477        let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
478        let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
479        let recent = d.recent_events(10);
480        // First event in `recent` is the newest novel-query (fp3).
481        match &recent[0] {
482            AnomalyEvent::NovelQuery { fingerprint, .. } => {
483                assert_eq!(fingerprint, "fp3")
484            }
485            other => panic!("expected NovelQuery fp3, got {:?}", other),
486        }
487    }
488
489    #[test]
490    fn recent_events_respects_limit() {
491        let d = AnomalyDetector::new(AnomalyConfig::default());
492        for i in 0..50 {
493            let fp = format!("fp{}", i);
494            let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
495        }
496        assert_eq!(d.recent_events(10).len(), 10);
497        assert_eq!(d.recent_events(100).len(), 50);
498    }
499
500    #[test]
501    fn event_buffer_evicts_oldest_when_full() {
502        let mut cfg = AnomalyConfig::default();
503        cfg.event_buffer_size = 5;
504        let d = AnomalyDetector::new(cfg);
505        for i in 0..20 {
506            let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
507        }
508        // Buffer holds at most 5; total event_count reflects current
509        // buffer size, not lifetime count (simpler than tracking
510        // separately).
511        assert_eq!(d.event_count(), 5);
512    }
513}