use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct Ewma {
alpha: f64,
value: Option<f64>,
}
impl Ewma {
pub fn new(alpha: f64) -> Self {
assert!(alpha > 0.0 && alpha <= 1.0, "alpha must be in (0, 1]");
Self { alpha, value: None }
}
pub fn observe(&mut self, x: f64) {
self.value = Some(match self.value {
None => x,
Some(prev) => self.alpha * x + (1.0 - self.alpha) * prev,
});
}
pub fn value(&self) -> Option<f64> {
self.value
}
}
#[derive(Debug, Clone)]
pub struct RateWindow {
window_secs: u64,
buckets: Vec<u64>,
head: usize,
anchor: Option<Instant>,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct SpikeScore {
pub rate: f64,
pub baseline: f64,
pub z_score: f64,
}
impl RateWindow {
pub fn new(window_secs: u64) -> Self {
assert!(window_secs >= 2, "window must hold at least 2 buckets");
Self {
window_secs,
buckets: vec![0u64; window_secs as usize],
head: 0,
anchor: None,
}
}
pub fn observe_and_score(&mut self, now: Instant) -> Option<SpikeScore> {
self.advance_to(now);
self.buckets[self.head] = self.buckets[self.head].saturating_add(1);
let n = self.buckets.len();
let prior: Vec<u64> = (0..n)
.filter(|&i| i != self.head)
.map(|i| self.buckets[i])
.collect();
let prior_sum: u64 = prior.iter().sum();
let populated = prior.iter().filter(|&&v| v > 0).count();
if populated < (n / 2) {
return None;
}
let mean = prior_sum as f64 / prior.len() as f64;
let var = prior
.iter()
.map(|&v| {
let d = v as f64 - mean;
d * d
})
.sum::<f64>()
/ prior.len() as f64;
let std = var.sqrt();
if std <= 0.0 {
return None;
}
let rate = self.buckets[self.head] as f64;
let z = (rate - mean) / std;
Some(SpikeScore {
rate,
baseline: mean,
z_score: z,
})
}
fn advance_to(&mut self, now: Instant) {
let anchor = match self.anchor {
None => {
self.anchor = Some(now);
return;
}
Some(a) => a,
};
let elapsed = now.duration_since(anchor);
let secs_advanced = elapsed.as_secs();
if secs_advanced == 0 {
return;
}
let cap = self.window_secs.min(secs_advanced);
for _ in 0..cap {
self.head = (self.head + 1) % self.buckets.len();
self.buckets[self.head] = 0;
}
self.anchor = Some(anchor + Duration::from_secs(secs_advanced));
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn ewma_seeds_with_first_observation() {
let mut e = Ewma::new(0.5);
assert_eq!(e.value(), None);
e.observe(10.0);
assert_eq!(e.value(), Some(10.0));
}
#[test]
fn ewma_smooths_subsequent_observations() {
let mut e = Ewma::new(0.5);
e.observe(10.0);
e.observe(20.0);
assert!((e.value().unwrap() - 15.0).abs() < 1e-9);
e.observe(30.0);
assert!((e.value().unwrap() - 22.5).abs() < 1e-9);
}
#[test]
fn rate_window_returns_none_before_window_fills() {
let mut w = RateWindow::new(10);
let r = w.observe_and_score(Instant::now());
assert!(r.is_none());
}
#[test]
fn rate_window_scores_a_clean_spike() {
let mut w = RateWindow::new(10);
let mut t = Instant::now();
for i in 0..9 {
t += Duration::from_secs(1);
for _ in 0..(1 + (i % 2) as u32) {
let _ = w.observe_and_score(t);
}
}
t += Duration::from_secs(1);
let mut last = None;
for _ in 0..50 {
last = w.observe_and_score(t);
}
let score = last.expect("should have a score after window fills");
assert!(
score.z_score > 5.0,
"expected a large z-score, got {:?}",
score
);
}
#[test]
fn rate_window_zero_fills_idle_gaps() {
let mut w = RateWindow::new(5);
let t = Instant::now();
for _ in 0..3 {
let _ = w.observe_and_score(t);
}
let later = t + Duration::from_secs(10);
let _ = w.observe_and_score(later);
let r = w.observe_and_score(later);
assert!(r.is_none(), "all-zero prior should produce no score");
}
#[test]
fn rate_window_panics_on_too_small_window() {
let res = std::panic::catch_unwind(|| RateWindow::new(1));
assert!(res.is_err());
}
#[test]
fn ewma_panics_on_invalid_alpha() {
let res = std::panic::catch_unwind(|| Ewma::new(0.0));
assert!(res.is_err());
let res = std::panic::catch_unwind(|| Ewma::new(1.5));
assert!(res.is_err());
}
}