Skip to main content

anomstream_core/thresholded/
stats.rs

1//! Exponential-moving mean and variance of the anomaly-score stream.
2//!
3//! [`EmaStats`] tracks the running mean and variance of the per-point
4//! scores a [`crate::ThresholdedForest`] observes, using a single decay
5//! factor `α ∈ (0, 1]`. The recurrence is
6//!
7//! ```text
8//! delta  = x − mean
9//! mean' = mean  + α · delta
10//! var'  = (1 − α) · (var + α · delta²)
11//! ```
12//!
13//! which is the standard exponentially-weighted Welford update:
14//! `mean'` is the biased EMA and `var'` is the EMA of the squared
15//! deviation *about the previous mean* — the correct estimator under
16//! exponential weighting (West 1979).
17//!
18//! The decay factor controls the effective memory window: with
19//! `α = 0.01` the statistics "forget" roughly after `1 / α = 100`
20//! observations. [`EmaStats`] is also the place where `observations`
21//! is counted so callers (threshold derivation, warmup gating) can
22//! refuse to emit an anomaly verdict before enough data has been
23//! seen.
24
25use alloc::format;
26
27#[cfg(not(feature = "std"))]
28#[allow(unused_imports)]
29use num_traits::Float;
30
31use crate::error::{RcfError, RcfResult};
32
33/// Exponentially-weighted running mean + variance.
34///
35/// Invariants:
36/// - `decay` is finite, in `(0, 1]`.
37/// - `observations` is monotonically non-decreasing.
38/// - `mean` and `variance` are finite whenever `observations > 0`.
39#[derive(Debug, Clone, PartialEq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41pub struct EmaStats {
42    /// EMA of the observed values.
43    mean: f64,
44    /// EMA of squared deviation about the previous mean.
45    variance: f64,
46    /// Per-update smoothing factor.
47    decay: f64,
48    /// Total number of [`EmaStats::update`] calls since construction
49    /// or [`EmaStats::reset`].
50    observations: u64,
51}
52
53impl EmaStats {
54    /// Build a fresh tracker with the supplied smoothing factor.
55    ///
56    /// # Errors
57    ///
58    /// Returns [`RcfError::InvalidConfig`] when `decay` is non-finite
59    /// or falls outside `(0.0, 1.0]`.
60    pub fn new(decay: f64) -> RcfResult<Self> {
61        if !decay.is_finite() || decay <= 0.0 || decay > 1.0 {
62            return Err(RcfError::InvalidConfig(
63                format!("EmaStats decay must be in (0.0, 1.0], got {decay}").into(),
64            ));
65        }
66        Ok(Self {
67            mean: 0.0,
68            variance: 0.0,
69            decay,
70            observations: 0,
71        })
72    }
73
74    /// Fold a new observation into the running statistics.
75    ///
76    /// Non-finite inputs are silently ignored — callers should reject
77    /// or sanitise `NaN`/`±∞` before feeding them in. The observation
78    /// counter is still incremented only on accepted inputs so that
79    /// [`EmaStats::observations`] reflects the size of the sample the
80    /// statistics were actually built from.
81    pub fn update(&mut self, value: f64) {
82        if !value.is_finite() {
83            return;
84        }
85        let delta = value - self.mean;
86        if self.observations == 0 {
87            // Bootstrap: first sample is the exact mean, variance stays
88            // at zero. Avoids an initial spike that would drag the EMA
89            // toward 0 for callers observing scores far from origin.
90            self.mean = value;
91            self.variance = 0.0;
92        } else {
93            self.mean += self.decay * delta;
94            self.variance = (1.0 - self.decay) * (self.variance + self.decay * delta * delta);
95        }
96        self.observations = self.observations.saturating_add(1);
97    }
98
99    /// Running mean estimate. Zero when no observation has been folded
100    /// in yet.
101    #[must_use]
102    pub fn mean(&self) -> f64 {
103        self.mean
104    }
105
106    /// Running variance estimate (always non-negative by construction).
107    #[must_use]
108    pub fn variance(&self) -> f64 {
109        self.variance.max(0.0)
110    }
111
112    /// Running standard deviation estimate.
113    #[must_use]
114    pub fn stddev(&self) -> f64 {
115        self.variance().sqrt()
116    }
117
118    /// Number of observations folded in since the last [`EmaStats::reset`].
119    #[must_use]
120    pub fn observations(&self) -> u64 {
121        self.observations
122    }
123
124    /// Configured smoothing factor.
125    #[must_use]
126    pub fn decay(&self) -> f64 {
127        self.decay
128    }
129
130    /// Drop every aggregated quantity and restart from the bootstrap
131    /// state. Used by the thresholded detector's own `reset` path and
132    /// by tests.
133    pub fn reset(&mut self) {
134        self.mean = 0.0;
135        self.variance = 0.0;
136        self.observations = 0;
137    }
138}
139
140#[cfg(test)]
141#[allow(clippy::float_cmp)] // Tests assert exact equality on bootstrap + closed-form expectations.
142mod tests {
143    use super::*;
144
145    #[test]
146    fn new_rejects_non_finite_decay() {
147        assert!(EmaStats::new(f64::NAN).is_err());
148        assert!(EmaStats::new(f64::INFINITY).is_err());
149    }
150
151    #[test]
152    fn new_rejects_non_positive_decay() {
153        assert!(EmaStats::new(0.0).is_err());
154        assert!(EmaStats::new(-0.1).is_err());
155    }
156
157    #[test]
158    fn new_rejects_decay_above_one() {
159        assert!(EmaStats::new(1.001).is_err());
160    }
161
162    #[test]
163    fn new_accepts_decay_at_one() {
164        // decay=1 is legal (full replacement every update).
165        EmaStats::new(1.0).unwrap();
166    }
167
168    #[test]
169    fn first_update_sets_mean_exactly() {
170        let mut s = EmaStats::new(0.1).unwrap();
171        s.update(7.0);
172        assert_eq!(s.mean(), 7.0);
173        assert_eq!(s.variance(), 0.0);
174        assert_eq!(s.observations(), 1);
175    }
176
177    #[test]
178    fn non_finite_update_is_ignored() {
179        let mut s = EmaStats::new(0.1).unwrap();
180        s.update(f64::NAN);
181        s.update(f64::INFINITY);
182        assert_eq!(s.observations(), 0);
183        assert_eq!(s.mean(), 0.0);
184    }
185
186    #[test]
187    fn mean_tracks_constant_stream_with_zero_variance() {
188        let mut s = EmaStats::new(0.1).unwrap();
189        for _ in 0..1000 {
190            s.update(5.0);
191        }
192        assert!((s.mean() - 5.0).abs() < 1e-9);
193        assert!(s.variance() < 1e-12);
194    }
195
196    #[test]
197    fn variance_tracks_spread() {
198        let mut s = EmaStats::new(0.05).unwrap();
199        // Alternating ±1 around 0 → variance should settle near 1.
200        for i in 0..5_000 {
201            let v = if i % 2 == 0 { 1.0 } else { -1.0 };
202            s.update(v);
203        }
204        // Mean should be near 0, stddev near 1.
205        assert!(s.mean().abs() < 0.1);
206        assert!(s.stddev() > 0.5);
207        assert!(s.stddev() < 1.5);
208    }
209
210    #[test]
211    fn reset_clears_state() {
212        let mut s = EmaStats::new(0.1).unwrap();
213        for i in 0..10 {
214            s.update(f64::from(i));
215        }
216        assert!(s.observations() > 0);
217        s.reset();
218        assert_eq!(s.mean(), 0.0);
219        assert_eq!(s.variance(), 0.0);
220        assert_eq!(s.observations(), 0);
221    }
222
223    #[test]
224    fn observations_saturates_at_u64_max() {
225        let mut s = EmaStats::new(1.0).unwrap();
226        s.observations = u64::MAX;
227        s.update(1.0);
228        assert_eq!(s.observations(), u64::MAX);
229    }
230
231    #[test]
232    fn variance_is_never_negative() {
233        // A pathological sequence that floats near zero should still
234        // keep variance >= 0 when read via the accessor.
235        let mut s = EmaStats::new(0.5).unwrap();
236        s.update(1e-300);
237        s.update(-1e-300);
238        assert!(s.variance() >= 0.0);
239    }
240}