heliosdb_proxy/anomaly/
ewma.rs1use std::time::{Duration, Instant};
14
15#[derive(Debug, Clone)]
17pub struct Ewma {
18 alpha: f64,
20 value: Option<f64>,
22}
23
24impl Ewma {
25 pub fn new(alpha: f64) -> Self {
26 assert!(alpha > 0.0 && alpha <= 1.0, "alpha must be in (0, 1]");
27 Self { alpha, value: None }
28 }
29
30 pub fn observe(&mut self, x: f64) {
31 self.value = Some(match self.value {
32 None => x,
33 Some(prev) => self.alpha * x + (1.0 - self.alpha) * prev,
34 });
35 }
36
37 pub fn value(&self) -> Option<f64> {
38 self.value
39 }
40}
41
42#[derive(Debug, Clone)]
50pub struct RateWindow {
51 window_secs: u64,
52 buckets: Vec<u64>,
54 head: usize,
56 anchor: Option<Instant>,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq)]
62pub struct SpikeScore {
63 pub rate: f64,
64 pub baseline: f64,
65 pub z_score: f64,
66}
67
68impl RateWindow {
69 pub fn new(window_secs: u64) -> Self {
70 assert!(window_secs >= 2, "window must hold at least 2 buckets");
71 Self {
72 window_secs,
73 buckets: vec![0u64; window_secs as usize],
74 head: 0,
75 anchor: None,
76 }
77 }
78
79 pub fn observe_and_score(&mut self, now: Instant) -> Option<SpikeScore> {
83 self.advance_to(now);
84 self.buckets[self.head] = self.buckets[self.head].saturating_add(1);
85
86 let n = self.buckets.len();
89 let prior: Vec<u64> = (0..n)
90 .filter(|&i| i != self.head)
91 .map(|i| self.buckets[i])
92 .collect();
93 let prior_sum: u64 = prior.iter().sum();
94 let populated = prior.iter().filter(|&&v| v > 0).count();
95 if populated < (n / 2) {
96 return None;
97 }
98 let mean = prior_sum as f64 / prior.len() as f64;
99 let var = prior
100 .iter()
101 .map(|&v| {
102 let d = v as f64 - mean;
103 d * d
104 })
105 .sum::<f64>()
106 / prior.len() as f64;
107 let std = var.sqrt();
108 if std <= 0.0 {
109 return None;
110 }
111 let rate = self.buckets[self.head] as f64;
112 let z = (rate - mean) / std;
113 Some(SpikeScore {
114 rate,
115 baseline: mean,
116 z_score: z,
117 })
118 }
119
120 fn advance_to(&mut self, now: Instant) {
123 let anchor = match self.anchor {
124 None => {
125 self.anchor = Some(now);
126 return;
127 }
128 Some(a) => a,
129 };
130 let elapsed = now.duration_since(anchor);
131 let secs_advanced = elapsed.as_secs();
132 if secs_advanced == 0 {
133 return;
134 }
135 let cap = self.window_secs.min(secs_advanced);
138 for _ in 0..cap {
139 self.head = (self.head + 1) % self.buckets.len();
140 self.buckets[self.head] = 0;
141 }
142 self.anchor = Some(anchor + Duration::from_secs(secs_advanced));
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn ewma_seeds_with_first_observation() {
152 let mut e = Ewma::new(0.5);
153 assert_eq!(e.value(), None);
154 e.observe(10.0);
155 assert_eq!(e.value(), Some(10.0));
156 }
157
158 #[test]
159 fn ewma_smooths_subsequent_observations() {
160 let mut e = Ewma::new(0.5);
161 e.observe(10.0);
162 e.observe(20.0);
163 assert!((e.value().unwrap() - 15.0).abs() < 1e-9);
165 e.observe(30.0);
166 assert!((e.value().unwrap() - 22.5).abs() < 1e-9);
168 }
169
170 #[test]
171 fn rate_window_returns_none_before_window_fills() {
172 let mut w = RateWindow::new(10);
173 let r = w.observe_and_score(Instant::now());
174 assert!(r.is_none());
175 }
176
177 #[test]
178 fn rate_window_scores_a_clean_spike() {
179 let mut w = RateWindow::new(10);
180 let mut t = Instant::now();
181 for i in 0..9 {
186 t += Duration::from_secs(1);
187 for _ in 0..(1 + (i % 2) as u32) {
188 let _ = w.observe_and_score(t);
189 }
190 }
191 t += Duration::from_secs(1);
193 let mut last = None;
194 for _ in 0..50 {
195 last = w.observe_and_score(t);
196 }
197 let score = last.expect("should have a score after window fills");
198 assert!(
199 score.z_score > 5.0,
200 "expected a large z-score, got {:?}",
201 score
202 );
203 }
204
205 #[test]
206 fn rate_window_zero_fills_idle_gaps() {
207 let mut w = RateWindow::new(5);
208 let t = Instant::now();
209 for _ in 0..3 {
213 let _ = w.observe_and_score(t);
214 }
215 let later = t + Duration::from_secs(10);
216 let _ = w.observe_and_score(later);
217 let r = w.observe_and_score(later);
221 assert!(r.is_none(), "all-zero prior should produce no score");
222 }
223
224 #[test]
225 fn rate_window_panics_on_too_small_window() {
226 let res = std::panic::catch_unwind(|| RateWindow::new(1));
227 assert!(res.is_err());
228 }
229
230 #[test]
231 fn ewma_panics_on_invalid_alpha() {
232 let res = std::panic::catch_unwind(|| Ewma::new(0.0));
233 assert!(res.is_err());
234 let res = std::panic::catch_unwind(|| Ewma::new(1.5));
235 assert!(res.is_err());
236 }
237}