heliosdb-proxy 0.4.2

HeliosProxy - Intelligent connection router and failover manager for HeliosDB and PostgreSQL
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//! Anomaly detection (T3.1).
//!
//! Statistical + heuristic detector for production-shape security
//! and operational anomalies. In-process sliding windows; no
//! external data store. Four detector families:
//!
//! 1. **Rate spike** — z-score on per-tenant queries-per-second
//!    against a rolling EWMA baseline.
//! 2. **Credential stuffing** — failed-auth burst per (user, ip)
//!    inside a sliding 60s window.
//! 3. **SQL injection** — heuristic pattern match against well-known
//!    payload shapes (UNION-based, comment escapes, stacked queries,
//!    boolean blind, time-based blind).
//! 4. **Novel query** — query fingerprint never seen before, useful
//!    on high-churn application workloads only as an informational
//!    signal (low confidence by default; admins can tighten via
//!    config).
//!
//! Why not a trained classifier today?
//!
//! Production anomaly classifiers want labels — feedback loops from
//! analyst-marked false positives. Without that loop in place, a
//! trained model overfits to whatever traffic was present at
//! training time. Statistical detectors are honest about their
//! priors (the EWMA + z-score) and degrade gracefully. The
//! [`AnomalyEvent`] trail makes it possible to bolt a learned
//! classifier on later: events become labeled training data.

use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};

use parking_lot::RwLock;
use serde::{Deserialize, Serialize};

pub mod ewma;
pub mod sql_injection;

pub use ewma::{Ewma, RateWindow};

/// Anomaly severity — surfaces in admin output and lets operators
/// filter detections at scale.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
    Info,
    Warning,
    Critical,
}

/// One anomaly detection event.
#[derive(Debug, Clone, Serialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum AnomalyEvent {
    /// Per-tenant request-rate spike against the rolling baseline.
    RateSpike {
        tenant: String,
        rate_per_sec: f64,
        baseline: f64,
        z_score: f64,
        severity: Severity,
        detected_at: String,
    },
    /// Failed-auth burst from a single (user, ip) pair.
    AuthBurst {
        user: String,
        client_ip: String,
        failures: u32,
        window_secs: u32,
        severity: Severity,
        detected_at: String,
    },
    /// SQL-injection-shaped statement matched one or more
    /// well-known payload patterns.
    SqlInjection {
        sql_excerpt: String,
        patterns_matched: Vec<String>,
        severity: Severity,
        detected_at: String,
    },
    /// First-seen query fingerprint. Informational by default.
    NovelQuery {
        fingerprint: String,
        sql_excerpt: String,
        detected_at: String,
    },
}

impl AnomalyEvent {
    pub fn severity(&self) -> Severity {
        match self {
            AnomalyEvent::RateSpike { severity, .. } => *severity,
            AnomalyEvent::AuthBurst { severity, .. } => *severity,
            AnomalyEvent::SqlInjection { severity, .. } => *severity,
            AnomalyEvent::NovelQuery { .. } => Severity::Info,
        }
    }
}

/// Tunables. Defaults match production-friendly behaviour: spike
/// threshold above 3σ, credential burst above 10 failures / 60s.
#[derive(Debug, Clone)]
pub struct AnomalyConfig {
    /// Rolling window for the per-tenant EWMA, in seconds.
    pub rate_window_secs: u64,
    /// Minimum z-score before a rate spike fires.
    pub spike_z_threshold: f64,
    /// Window for failed-auth bursts, in seconds.
    pub auth_window_secs: u64,
    /// Failures inside the auth window that trigger Critical.
    pub auth_critical_count: u32,
    /// Failures inside the auth window that trigger Warning.
    pub auth_warning_count: u32,
    /// Maximum events kept in the in-memory ring buffer.
    pub event_buffer_size: usize,
    /// Treat novel queries as informational events. Set false to
    /// suppress on high-churn workloads (e.g. ad-hoc analytics).
    pub emit_novel_queries: bool,
}

impl Default for AnomalyConfig {
    fn default() -> Self {
        Self {
            rate_window_secs: 60,
            spike_z_threshold: 3.0,
            auth_window_secs: 60,
            auth_critical_count: 10,
            auth_warning_count: 5,
            event_buffer_size: 1024,
            emit_novel_queries: true,
        }
    }
}

/// Top-level detector. Cheap to clone via Arc — the inner state is
/// guarded by parking_lot RwLocks scoped per-detector to avoid
/// cross-detector contention.
#[derive(Clone)]
pub struct AnomalyDetector {
    config: Arc<AnomalyConfig>,
    rate_windows: Arc<RwLock<HashMap<String, RateWindow>>>,
    auth_windows: Arc<RwLock<HashMap<(String, String), AuthBurstWindow>>>,
    seen_fingerprints: Arc<RwLock<HashMap<String, ()>>>,
    events: Arc<RwLock<VecDeque<AnomalyEvent>>>,
}

impl AnomalyDetector {
    pub fn new(config: AnomalyConfig) -> Self {
        Self {
            config: Arc::new(config),
            rate_windows: Arc::new(RwLock::new(HashMap::new())),
            auth_windows: Arc::new(RwLock::new(HashMap::new())),
            seen_fingerprints: Arc::new(RwLock::new(HashMap::new())),
            events: Arc::new(RwLock::new(VecDeque::with_capacity(1024))),
        }
    }

    /// Record a query event. Detectors may emit zero or more events.
    /// Returns the events emitted by THIS call (the caller can also
    /// poll `recent_events` for the full ring buffer).
    pub fn record_query(&self, ctx: &QueryObservation) -> Vec<AnomalyEvent> {
        let mut emitted = Vec::new();

        // Rate-spike detector.
        let mut rates = self.rate_windows.write();
        let window = rates
            .entry(ctx.tenant.clone())
            .or_insert_with(|| RateWindow::new(self.config.rate_window_secs));
        if let Some(spike) = window.observe_and_score(ctx.timestamp) {
            if spike.z_score >= self.config.spike_z_threshold {
                let severity = if spike.z_score >= self.config.spike_z_threshold * 2.0 {
                    Severity::Critical
                } else {
                    Severity::Warning
                };
                let ev = AnomalyEvent::RateSpike {
                    tenant: ctx.tenant.clone(),
                    rate_per_sec: spike.rate,
                    baseline: spike.baseline,
                    z_score: spike.z_score,
                    severity,
                    detected_at: ctx.iso_timestamp.clone(),
                };
                emitted.push(ev.clone());
                self.push_event(ev);
            }
        }
        drop(rates);

        // Novel-query detector.
        if self.config.emit_novel_queries {
            let mut seen = self.seen_fingerprints.write();
            if !seen.contains_key(&ctx.fingerprint) {
                seen.insert(ctx.fingerprint.clone(), ());
                let ev = AnomalyEvent::NovelQuery {
                    fingerprint: ctx.fingerprint.clone(),
                    sql_excerpt: excerpt(&ctx.sql, 120),
                    detected_at: ctx.iso_timestamp.clone(),
                };
                emitted.push(ev.clone());
                self.push_event(ev);
            }
        }

        // SQL-injection detector. Pure heuristic — runs even if the
        // upstream pre-query already passed; multiple layers is the
        // point.
        let matches = sql_injection::scan(&ctx.sql);
        if !matches.is_empty() {
            let severity = if matches.len() >= 2 {
                Severity::Critical
            } else {
                Severity::Warning
            };
            let ev = AnomalyEvent::SqlInjection {
                sql_excerpt: excerpt(&ctx.sql, 200),
                patterns_matched: matches,
                severity,
                detected_at: ctx.iso_timestamp.clone(),
            };
            emitted.push(ev.clone());
            self.push_event(ev);
        }

        emitted
    }

    /// Record an authentication outcome. Failed auths feed the
    /// credential-stuffing detector.
    pub fn record_auth(
        &self,
        user: &str,
        client_ip: &str,
        succeeded: bool,
        timestamp: Instant,
        iso_timestamp: &str,
    ) -> Option<AnomalyEvent> {
        if succeeded {
            // Successful auth resets the burst counter — common
            // case after the operator unlocks an account.
            self.auth_windows
                .write()
                .remove(&(user.to_string(), client_ip.to_string()));
            return None;
        }
        let mut windows = self.auth_windows.write();
        let window = windows
            .entry((user.to_string(), client_ip.to_string()))
            .or_insert_with(|| AuthBurstWindow::new(self.config.auth_window_secs));
        let count = window.record_failure(timestamp);
        let severity = if count >= self.config.auth_critical_count {
            Severity::Critical
        } else if count >= self.config.auth_warning_count {
            Severity::Warning
        } else {
            return None;
        };
        let ev = AnomalyEvent::AuthBurst {
            user: user.to_string(),
            client_ip: client_ip.to_string(),
            failures: count,
            window_secs: self.config.auth_window_secs as u32,
            severity,
            detected_at: iso_timestamp.to_string(),
        };
        drop(windows);
        self.push_event(ev.clone());
        Some(ev)
    }

    /// Snapshot of the most recent events. Newest first.
    pub fn recent_events(&self, limit: usize) -> Vec<AnomalyEvent> {
        let evs = self.events.read();
        let n = limit.min(evs.len());
        let mut out = Vec::with_capacity(n);
        for ev in evs.iter().rev().take(n) {
            out.push(ev.clone());
        }
        out
    }

    /// Total events ever recorded (since process start). Useful for
    /// metrics export.
    pub fn event_count(&self) -> usize {
        self.events.read().len()
    }

    fn push_event(&self, ev: AnomalyEvent) {
        let mut evs = self.events.write();
        if evs.len() >= self.config.event_buffer_size {
            evs.pop_front();
        }
        evs.push_back(ev);
    }
}

/// Per-query observation passed to the detector. Built by the proxy
/// at hook time; populated as much as the proxy knows about the
/// query.
#[derive(Debug, Clone)]
pub struct QueryObservation {
    /// Tenant identifier (or "default" / "" when no multi-tenancy).
    pub tenant: String,
    /// Canonical query fingerprint (literals normalised). Same shape
    /// the analytics module produces.
    pub fingerprint: String,
    /// Raw SQL — used for SQL-injection scanning + UI excerpt.
    pub sql: String,
    /// Wall-clock timestamp the query arrived. Detectors compute
    /// rates against this.
    pub timestamp: Instant,
    /// Pre-formatted RFC 3339 timestamp the proxy already has;
    /// detectors copy it into events rather than re-format.
    pub iso_timestamp: String,
}

/// Sliding 60s window of failed auths. Auto-evicts entries older
/// than `window_secs`.
struct AuthBurstWindow {
    window: Duration,
    failures: VecDeque<Instant>,
}

impl AuthBurstWindow {
    fn new(window_secs: u64) -> Self {
        Self {
            window: Duration::from_secs(window_secs),
            failures: VecDeque::new(),
        }
    }

    fn record_failure(&mut self, now: Instant) -> u32 {
        // Evict entries older than the window.
        while let Some(&front) = self.failures.front() {
            if now.duration_since(front) > self.window {
                self.failures.pop_front();
            } else {
                break;
            }
        }
        self.failures.push_back(now);
        self.failures.len() as u32
    }
}

fn excerpt(s: &str, max: usize) -> String {
    if s.len() <= max {
        s.to_string()
    } else {
        format!("{}", &s[..max])
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn obs(tenant: &str, fp: &str, sql: &str) -> QueryObservation {
        QueryObservation {
            tenant: tenant.into(),
            fingerprint: fp.into(),
            sql: sql.into(),
            timestamp: Instant::now(),
            iso_timestamp: "2026-04-25T13:30:00Z".into(),
        }
    }

    #[test]
    fn novel_query_fires_once_per_fingerprint() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
        assert!(evs
            .iter()
            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
        let evs2 = d.record_query(&obs("acme", "fp1", "SELECT 1"));
        assert!(!evs2
            .iter()
            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
    }

    #[test]
    fn novel_query_can_be_suppressed_via_config() {
        let mut cfg = AnomalyConfig::default();
        cfg.emit_novel_queries = false;
        let d = AnomalyDetector::new(cfg);
        let evs = d.record_query(&obs("acme", "fp1", "SELECT 1"));
        assert!(!evs
            .iter()
            .any(|e| matches!(e, AnomalyEvent::NovelQuery { .. })));
    }

    #[test]
    fn sql_injection_detector_flags_classic_or_payload() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        let evs = d.record_query(&obs(
            "acme",
            "fp-inj",
            "SELECT * FROM users WHERE id = 1 OR 1=1 --",
        ));
        let sqli = evs
            .iter()
            .find(|e| matches!(e, AnomalyEvent::SqlInjection { .. }));
        assert!(sqli.is_some(), "expected SqlInjection event in {:?}", evs);
    }

    #[test]
    fn auth_burst_warning_below_critical_threshold() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        let now = Instant::now();
        let mut last = None;
        for _ in 0..6 {
            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
        }
        match last {
            Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
                assert_eq!(failures, 6);
                assert_eq!(severity, Severity::Warning);
            }
            other => panic!("expected AuthBurst Warning, got {:?}", other),
        }
    }

    #[test]
    fn auth_burst_critical_at_high_threshold() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        let now = Instant::now();
        let mut last = None;
        for _ in 0..12 {
            last = d.record_auth("alice", "10.0.0.1", false, now, "ts");
        }
        match last {
            Some(AnomalyEvent::AuthBurst { failures, severity, .. }) => {
                assert_eq!(failures, 12);
                assert_eq!(severity, Severity::Critical);
            }
            other => panic!("expected AuthBurst Critical, got {:?}", other),
        }
    }

    #[test]
    fn auth_success_resets_burst_window() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        let now = Instant::now();
        for _ in 0..6 {
            let _ = d.record_auth("alice", "10.0.0.1", false, now, "ts");
        }
        // Successful auth clears the window — next failure starts at 1.
        let _ = d.record_auth("alice", "10.0.0.1", true, now, "ts");
        let r = d.record_auth("alice", "10.0.0.1", false, now, "ts");
        // 1 failure is below the warning threshold (5) — None.
        assert!(r.is_none());
    }

    #[test]
    fn recent_events_returns_newest_first() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        let _ = d.record_query(&obs("a", "fp1", "SELECT 1"));
        let _ = d.record_query(&obs("a", "fp2", "SELECT 2"));
        let _ = d.record_query(&obs("a", "fp3", "SELECT 3"));
        let recent = d.recent_events(10);
        // First event in `recent` is the newest novel-query (fp3).
        match &recent[0] {
            AnomalyEvent::NovelQuery { fingerprint, .. } => {
                assert_eq!(fingerprint, "fp3")
            }
            other => panic!("expected NovelQuery fp3, got {:?}", other),
        }
    }

    #[test]
    fn recent_events_respects_limit() {
        let d = AnomalyDetector::new(AnomalyConfig::default());
        for i in 0..50 {
            let fp = format!("fp{}", i);
            let _ = d.record_query(&obs("a", &fp, "SELECT 1"));
        }
        assert_eq!(d.recent_events(10).len(), 10);
        assert_eq!(d.recent_events(100).len(), 50);
    }

    #[test]
    fn event_buffer_evicts_oldest_when_full() {
        let mut cfg = AnomalyConfig::default();
        cfg.event_buffer_size = 5;
        let d = AnomalyDetector::new(cfg);
        for i in 0..20 {
            let _ = d.record_query(&obs("a", &format!("fp{}", i), "SELECT 1"));
        }
        // Buffer holds at most 5; total event_count reflects current
        // buffer size, not lifetime count (simpler than tracking
        // separately).
        assert_eq!(d.event_count(), 5);
    }
}