anomstream_core/thresholded/stats.rs
1//! Exponential-moving mean and variance of the anomaly-score stream.
2//!
3//! [`EmaStats`] tracks the running mean and variance of the per-point
4//! scores a [`crate::ThresholdedForest`] observes, using a single decay
5//! factor `α ∈ (0, 1]`. The recurrence is
6//!
7//! ```text
8//! delta = x − mean
9//! mean' = mean + α · delta
10//! var' = (1 − α) · (var + α · delta²)
11//! ```
12//!
13//! which is the standard exponentially-weighted Welford update:
14//! `mean'` is the biased EMA and `var'` is the EMA of the squared
15//! deviation *about the previous mean* — the correct estimator under
16//! exponential weighting (West 1979).
17//!
18//! The decay factor controls the effective memory window: with
19//! `α = 0.01` the statistics "forget" roughly after `1 / α = 100`
20//! observations. [`EmaStats`] is also the place where `observations`
21//! is counted so callers (threshold derivation, warmup gating) can
22//! refuse to emit an anomaly verdict before enough data has been
23//! seen.
24
25use alloc::format;
26
27#[cfg(not(feature = "std"))]
28#[allow(unused_imports)]
29use num_traits::Float;
30
31use crate::error::{RcfError, RcfResult};
32
33/// Exponentially-weighted running mean + variance.
34///
35/// Invariants:
36/// - `decay` is finite, in `(0, 1]`.
37/// - `observations` is monotonically non-decreasing.
38/// - `mean` and `variance` are finite whenever `observations > 0`.
39#[derive(Debug, Clone, PartialEq)]
40#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
41pub struct EmaStats {
42 /// EMA of the observed values.
43 mean: f64,
44 /// EMA of squared deviation about the previous mean.
45 variance: f64,
46 /// Per-update smoothing factor.
47 decay: f64,
48 /// Total number of [`EmaStats::update`] calls since construction
49 /// or [`EmaStats::reset`].
50 observations: u64,
51}
52
53impl EmaStats {
54 /// Build a fresh tracker with the supplied smoothing factor.
55 ///
56 /// # Errors
57 ///
58 /// Returns [`RcfError::InvalidConfig`] when `decay` is non-finite
59 /// or falls outside `(0.0, 1.0]`.
60 pub fn new(decay: f64) -> RcfResult<Self> {
61 if !decay.is_finite() || decay <= 0.0 || decay > 1.0 {
62 return Err(RcfError::InvalidConfig(
63 format!("EmaStats decay must be in (0.0, 1.0], got {decay}").into(),
64 ));
65 }
66 Ok(Self {
67 mean: 0.0,
68 variance: 0.0,
69 decay,
70 observations: 0,
71 })
72 }
73
74 /// Fold a new observation into the running statistics.
75 ///
76 /// Non-finite inputs are silently ignored — callers should reject
77 /// or sanitise `NaN`/`±∞` before feeding them in. The observation
78 /// counter is still incremented only on accepted inputs so that
79 /// [`EmaStats::observations`] reflects the size of the sample the
80 /// statistics were actually built from.
81 pub fn update(&mut self, value: f64) {
82 if !value.is_finite() {
83 return;
84 }
85 let delta = value - self.mean;
86 if self.observations == 0 {
87 // Bootstrap: first sample is the exact mean, variance stays
88 // at zero. Avoids an initial spike that would drag the EMA
89 // toward 0 for callers observing scores far from origin.
90 self.mean = value;
91 self.variance = 0.0;
92 } else {
93 self.mean += self.decay * delta;
94 self.variance = (1.0 - self.decay) * (self.variance + self.decay * delta * delta);
95 }
96 self.observations = self.observations.saturating_add(1);
97 }
98
99 /// Running mean estimate. Zero when no observation has been folded
100 /// in yet.
101 #[must_use]
102 pub fn mean(&self) -> f64 {
103 self.mean
104 }
105
106 /// Running variance estimate (always non-negative by construction).
107 #[must_use]
108 pub fn variance(&self) -> f64 {
109 self.variance.max(0.0)
110 }
111
112 /// Running standard deviation estimate.
113 #[must_use]
114 pub fn stddev(&self) -> f64 {
115 self.variance().sqrt()
116 }
117
118 /// Number of observations folded in since the last [`EmaStats::reset`].
119 #[must_use]
120 pub fn observations(&self) -> u64 {
121 self.observations
122 }
123
124 /// Configured smoothing factor.
125 #[must_use]
126 pub fn decay(&self) -> f64 {
127 self.decay
128 }
129
130 /// Drop every aggregated quantity and restart from the bootstrap
131 /// state. Used by the thresholded detector's own `reset` path and
132 /// by tests.
133 pub fn reset(&mut self) {
134 self.mean = 0.0;
135 self.variance = 0.0;
136 self.observations = 0;
137 }
138}
139
140#[cfg(test)]
141#[allow(clippy::float_cmp)] // Tests assert exact equality on bootstrap + closed-form expectations.
142mod tests {
143 use super::*;
144
145 #[test]
146 fn new_rejects_non_finite_decay() {
147 assert!(EmaStats::new(f64::NAN).is_err());
148 assert!(EmaStats::new(f64::INFINITY).is_err());
149 }
150
151 #[test]
152 fn new_rejects_non_positive_decay() {
153 assert!(EmaStats::new(0.0).is_err());
154 assert!(EmaStats::new(-0.1).is_err());
155 }
156
157 #[test]
158 fn new_rejects_decay_above_one() {
159 assert!(EmaStats::new(1.001).is_err());
160 }
161
162 #[test]
163 fn new_accepts_decay_at_one() {
164 // decay=1 is legal (full replacement every update).
165 EmaStats::new(1.0).unwrap();
166 }
167
168 #[test]
169 fn first_update_sets_mean_exactly() {
170 let mut s = EmaStats::new(0.1).unwrap();
171 s.update(7.0);
172 assert_eq!(s.mean(), 7.0);
173 assert_eq!(s.variance(), 0.0);
174 assert_eq!(s.observations(), 1);
175 }
176
177 #[test]
178 fn non_finite_update_is_ignored() {
179 let mut s = EmaStats::new(0.1).unwrap();
180 s.update(f64::NAN);
181 s.update(f64::INFINITY);
182 assert_eq!(s.observations(), 0);
183 assert_eq!(s.mean(), 0.0);
184 }
185
186 #[test]
187 fn mean_tracks_constant_stream_with_zero_variance() {
188 let mut s = EmaStats::new(0.1).unwrap();
189 for _ in 0..1000 {
190 s.update(5.0);
191 }
192 assert!((s.mean() - 5.0).abs() < 1e-9);
193 assert!(s.variance() < 1e-12);
194 }
195
196 #[test]
197 fn variance_tracks_spread() {
198 let mut s = EmaStats::new(0.05).unwrap();
199 // Alternating ±1 around 0 → variance should settle near 1.
200 for i in 0..5_000 {
201 let v = if i % 2 == 0 { 1.0 } else { -1.0 };
202 s.update(v);
203 }
204 // Mean should be near 0, stddev near 1.
205 assert!(s.mean().abs() < 0.1);
206 assert!(s.stddev() > 0.5);
207 assert!(s.stddev() < 1.5);
208 }
209
210 #[test]
211 fn reset_clears_state() {
212 let mut s = EmaStats::new(0.1).unwrap();
213 for i in 0..10 {
214 s.update(f64::from(i));
215 }
216 assert!(s.observations() > 0);
217 s.reset();
218 assert_eq!(s.mean(), 0.0);
219 assert_eq!(s.variance(), 0.0);
220 assert_eq!(s.observations(), 0);
221 }
222
223 #[test]
224 fn observations_saturates_at_u64_max() {
225 let mut s = EmaStats::new(1.0).unwrap();
226 s.observations = u64::MAX;
227 s.update(1.0);
228 assert_eq!(s.observations(), u64::MAX);
229 }
230
231 #[test]
232 fn variance_is_never_negative() {
233 // A pathological sequence that floats near zero should still
234 // keep variance >= 0 when read via the accessor.
235 let mut s = EmaStats::new(0.5).unwrap();
236 s.update(1e-300);
237 s.update(-1e-300);
238 assert!(s.variance() >= 0.0);
239 }
240}