Skip to main content

heliosdb_proxy/anomaly/
ewma.rs

1//! EWMA + sliding-bucket rate window. Used by the rate-spike
2//! detector to compute "queries per second" over a rolling window
3//! and a baseline EWMA against which spikes are scored.
4//!
5//! Bucket layout: 1-second buckets in a fixed ring. `observe` bumps
6//! the current bucket, advances the ring as time passes (zero-fill
7//! gaps), and recomputes the per-second mean + std-dev when asked
8//! for a score.
9//!
10//! Memory: 8 bytes per bucket × `window_secs`. At the default 60s
11//! window that's 480 bytes per tenant. Cheap.
12
13use std::time::{Duration, Instant};
14
15/// Exponentially-weighted moving average. Online, O(1) per update.
16#[derive(Debug, Clone)]
17pub struct Ewma {
18    /// Smoothing factor in (0, 1]. Closer to 1 = more reactive.
19    alpha: f64,
20    /// Current estimate. None until the first observation.
21    value: Option<f64>,
22}
23
24impl Ewma {
25    pub fn new(alpha: f64) -> Self {
26        assert!(alpha > 0.0 && alpha <= 1.0, "alpha must be in (0, 1]");
27        Self { alpha, value: None }
28    }
29
30    pub fn observe(&mut self, x: f64) {
31        self.value = Some(match self.value {
32            None => x,
33            Some(prev) => self.alpha * x + (1.0 - self.alpha) * prev,
34        });
35    }
36
37    pub fn value(&self) -> Option<f64> {
38        self.value
39    }
40}
41
42/// Sliding-bucket rate counter. Tracks per-second event counts in a
43/// fixed ring of length `window_secs`. Provides `observe_and_score`
44/// which:
45///   1. Increments the current second's bucket.
46///   2. Computes the mean + std-dev across the ring.
47///   3. Returns z-score = (current_rate - mean) / std-dev when the
48///      window is fully populated AND std-dev > 0.
49#[derive(Debug, Clone)]
50pub struct RateWindow {
51    window_secs: u64,
52    /// Ring of per-second counts. Length == window_secs.
53    buckets: Vec<u64>,
54    /// Index into `buckets` for the most recent second.
55    head: usize,
56    /// Wall-clock anchor for the current head bucket. Advancing
57    /// time rotates the ring and zero-fills skipped seconds.
58    anchor: Option<Instant>,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq)]
62pub struct SpikeScore {
63    pub rate: f64,
64    pub baseline: f64,
65    pub z_score: f64,
66}
67
68impl RateWindow {
69    pub fn new(window_secs: u64) -> Self {
70        assert!(window_secs >= 2, "window must hold at least 2 buckets");
71        Self {
72            window_secs,
73            buckets: vec![0u64; window_secs as usize],
74            head: 0,
75            anchor: None,
76        }
77    }
78
79    /// Bump the current bucket by one event and return a SpikeScore
80    /// when the window has accumulated enough data to be meaningful
81    /// (full window AND non-zero std-dev).
82    pub fn observe_and_score(&mut self, now: Instant) -> Option<SpikeScore> {
83        self.advance_to(now);
84        self.buckets[self.head] = self.buckets[self.head].saturating_add(1);
85
86        // Need at least window_secs * 0.5 buckets populated AND
87        // non-zero variance in the prior buckets to score.
88        let n = self.buckets.len();
89        let prior: Vec<u64> = (0..n)
90            .filter(|&i| i != self.head)
91            .map(|i| self.buckets[i])
92            .collect();
93        let prior_sum: u64 = prior.iter().sum();
94        let populated = prior.iter().filter(|&&v| v > 0).count();
95        if populated < (n / 2) {
96            return None;
97        }
98        let mean = prior_sum as f64 / prior.len() as f64;
99        let var = prior
100            .iter()
101            .map(|&v| {
102                let d = v as f64 - mean;
103                d * d
104            })
105            .sum::<f64>()
106            / prior.len() as f64;
107        let std = var.sqrt();
108        if std <= 0.0 {
109            return None;
110        }
111        let rate = self.buckets[self.head] as f64;
112        let z = (rate - mean) / std;
113        Some(SpikeScore {
114            rate,
115            baseline: mean,
116            z_score: z,
117        })
118    }
119
120    /// Advance the head pointer to track wall-clock seconds. Skipped
121    /// seconds get zero-filled buckets.
122    fn advance_to(&mut self, now: Instant) {
123        let anchor = match self.anchor {
124            None => {
125                self.anchor = Some(now);
126                return;
127            }
128            Some(a) => a,
129        };
130        let elapsed = now.duration_since(anchor);
131        let secs_advanced = elapsed.as_secs();
132        if secs_advanced == 0 {
133            return;
134        }
135        // Cap advancement at the ring size — anything beyond means
136        // we wrap and effectively reset the window.
137        let cap = self.window_secs.min(secs_advanced);
138        for _ in 0..cap {
139            self.head = (self.head + 1) % self.buckets.len();
140            self.buckets[self.head] = 0;
141        }
142        self.anchor = Some(anchor + Duration::from_secs(secs_advanced));
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn ewma_seeds_with_first_observation() {
152        let mut e = Ewma::new(0.5);
153        assert_eq!(e.value(), None);
154        e.observe(10.0);
155        assert_eq!(e.value(), Some(10.0));
156    }
157
158    #[test]
159    fn ewma_smooths_subsequent_observations() {
160        let mut e = Ewma::new(0.5);
161        e.observe(10.0);
162        e.observe(20.0);
163        // 0.5 * 20 + 0.5 * 10 = 15
164        assert!((e.value().unwrap() - 15.0).abs() < 1e-9);
165        e.observe(30.0);
166        // 0.5 * 30 + 0.5 * 15 = 22.5
167        assert!((e.value().unwrap() - 22.5).abs() < 1e-9);
168    }
169
170    #[test]
171    fn rate_window_returns_none_before_window_fills() {
172        let mut w = RateWindow::new(10);
173        let r = w.observe_and_score(Instant::now());
174        assert!(r.is_none());
175    }
176
177    #[test]
178    fn rate_window_scores_a_clean_spike() {
179        let mut w = RateWindow::new(10);
180        let mut t = Instant::now();
181        // Seed 9 prior seconds with 1 event each, then a spike of
182        // 100 in the 10th. Mean of prior = 1, std-dev = 0 → no
183        // score (uniform prior). To get a non-zero std-dev, vary
184        // the prior slightly.
185        for i in 0..9 {
186            t += Duration::from_secs(1);
187            for _ in 0..(1 + (i % 2) as u32) {
188                let _ = w.observe_and_score(t);
189            }
190        }
191        // Spike: 50 events in the next second.
192        t += Duration::from_secs(1);
193        let mut last = None;
194        for _ in 0..50 {
195            last = w.observe_and_score(t);
196        }
197        let score = last.expect("should have a score after window fills");
198        assert!(
199            score.z_score > 5.0,
200            "expected a large z-score, got {:?}",
201            score
202        );
203    }
204
205    #[test]
206    fn rate_window_zero_fills_idle_gaps() {
207        let mut w = RateWindow::new(5);
208        let t = Instant::now();
209        // Events at t and then a 10-second idle gap (longer than
210        // window): the prior buckets should all be 0 + the head
211        // counts the new event.
212        for _ in 0..3 {
213            let _ = w.observe_and_score(t);
214        }
215        let later = t + Duration::from_secs(10);
216        let _ = w.observe_and_score(later);
217        // After the 10s gap, the prior buckets are all zeros (we
218        // advanced more than window_secs). Score returns None
219        // because std-dev is 0 across the (all-zero) prior.
220        let r = w.observe_and_score(later);
221        assert!(r.is_none(), "all-zero prior should produce no score");
222    }
223
224    #[test]
225    fn rate_window_panics_on_too_small_window() {
226        let res = std::panic::catch_unwind(|| RateWindow::new(1));
227        assert!(res.is_err());
228    }
229
230    #[test]
231    fn ewma_panics_on_invalid_alpha() {
232        let res = std::panic::catch_unwind(|| Ewma::new(0.0));
233        assert!(res.is_err());
234        let res = std::panic::catch_unwind(|| Ewma::new(1.5));
235        assert!(res.is_err());
236    }
237}