anomstream_core/adwin.rs
1//! `ADWIN` (`ADaptive` `WINdowing`) — streaming change-point
2//! detector with automatic window sizing.
3//!
4//! Bifet & Gavaldà, *Learning from Time-Changing Data with
5//! Adaptive Windowing*, SIAM SDM 2007. The window holds recent
6//! observations; whenever the mean over two adjacent sub-windows
7//! differs by more than a Hoeffding-style bound, the older
8//! sub-window is dropped. The remaining window is the longest
9//! suffix still consistent with a single distribution.
10//!
11//! This implementation is a bounded-capacity simplification of
12//! Bifet's ADWIN2 (no exponential histograms): a ring buffer of
13//! the last `N` items + prefix-sum scans on each `update`. O(N)
14//! per update where `N` is the configured window cap. For the
15//! use-case in `anomstream-core` (drift on score streams, `N ≤ 4096`) the
16//! constant factors dominate over the logarithmic win of
17//! exponential histograms.
18//!
19//! # Drift statistic
20//!
21//! For every split point `i ∈ [1, N - 1]`:
22//!
23//! ```text
24//! mean_L = (1/n_L) · Σ_{j < i} x_j
25//! mean_R = (1/n_R) · Σ_{j ≥ i} x_j
26//! m = 1 / (1/n_L + 1/n_R) (harmonic mean of sizes)
27//! ε_cut = sqrt((1 / (2m)) · ln(4 · N / δ)) · (range)
28//! ```
29//!
30//! Drift fires when `|mean_L − mean_R| > ε_cut` for any split.
31//! `range` is the caller-declared amplitude of the stream (for
32//! bounded-range streams the Hoeffding constant collapses to 1).
33//!
34//! # Use with anomstream-core
35//!
36//! `AdwinDetector` is a standalone trigger — feed it the anomaly
37//! score stream (or any scalar per-step signal) and route
38//! [`AdwinDetector::update`]'s `true` return into
39//! [`crate::DriftAwareForest::on_drift`] to spawn a shadow forest.
40
41#![cfg(feature = "std")]
42
43use std::sync::Arc;
44
45use crate::error::{RcfError, RcfResult};
46use crate::metrics::{MetricsSink, default_sink, names};
47
48/// Default confidence budget `δ` — lower values = stricter bound,
49/// fewer false-positive drift fires. `0.002` matches Bifet's
50/// reference `δ = 0.002` experiment setting.
51pub const DEFAULT_DELTA: f64 = 0.002;
52
53/// Default window cap (bounded ring buffer). `4096` handles hour-
54/// scale streams at 1 observation/s without excess memory.
55pub const DEFAULT_WINDOW_CAP: usize = 4096;
56
57/// Minimum sub-window size required for a valid Hoeffding-bound
58/// comparison. Below this the detector stays silent — sub-window
59/// means are too noisy to distinguish drift from sampling jitter.
60pub const MIN_SUBWINDOW_LEN: usize = 16;
61
62/// ADWIN-style streaming change-point detector.
63#[derive(Debug, Clone)]
64#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
65pub struct AdwinDetector {
66 /// Caller-declared amplitude of the observed stream. Scales
67 /// the Hoeffding bound — bounded `[0, 1]` streams pass `1.0`.
68 range: f64,
69 /// Confidence budget — lower δ => stricter threshold.
70 delta: f64,
71 /// Maximum items kept in the window.
72 window_cap: usize,
73 /// Oldest-first ring of recent observations.
74 buffer: Vec<f64>,
75 /// Cumulative drift fires reported.
76 drift_fires: u64,
77 /// Observability sink — serde-skipped, restored to the noop
78 /// sink on round-trip.
79 #[cfg_attr(
80 feature = "serde",
81 serde(skip, default = "crate::metrics::default_sink")
82 )]
83 metrics: Arc<dyn MetricsSink>,
84}
85
86impl AdwinDetector {
87 /// Build a detector with caller-chosen stream amplitude.
88 ///
89 /// # Errors
90 ///
91 /// Returns [`RcfError::InvalidConfig`] on non-finite / non-
92 /// positive `range`, `delta` outside `(0, 1)`, or
93 /// `window_cap < 2 · MIN_SUBWINDOW_LEN`.
94 pub fn new(range: f64, delta: f64, window_cap: usize) -> RcfResult<Self> {
95 if !range.is_finite() || range <= 0.0 {
96 return Err(RcfError::InvalidConfig(
97 format!("AdwinDetector: range must be finite and > 0, got {range}").into(),
98 ));
99 }
100 if !delta.is_finite() || !(0.0..1.0).contains(&delta) || delta <= 0.0 {
101 return Err(RcfError::InvalidConfig(
102 format!("AdwinDetector: delta must be in (0.0, 1.0), got {delta}").into(),
103 ));
104 }
105 if window_cap < 2 * MIN_SUBWINDOW_LEN {
106 return Err(RcfError::InvalidConfig(
107 format!(
108 "AdwinDetector: window_cap must be >= {}, got {window_cap}",
109 2 * MIN_SUBWINDOW_LEN
110 )
111 .into(),
112 ));
113 }
114 Ok(Self {
115 range,
116 delta,
117 window_cap,
118 buffer: Vec::with_capacity(window_cap),
119 drift_fires: 0,
120 metrics: default_sink(),
121 })
122 }
123
124 /// Install a metrics sink — every `update` emits an observed
125 /// counter, and every drift fire bumps a drift-fires counter.
126 #[must_use]
127 pub fn with_metrics_sink(mut self, sink: Arc<dyn MetricsSink>) -> Self {
128 self.metrics = sink;
129 self
130 }
131
132 /// Read-only handle to the installed sink.
133 #[must_use]
134 pub fn metrics_sink(&self) -> &Arc<dyn MetricsSink> {
135 &self.metrics
136 }
137
138 /// Convenience: range `1.0`, [`DEFAULT_DELTA`],
139 /// [`DEFAULT_WINDOW_CAP`].
140 ///
141 /// # Panics
142 ///
143 /// Never — the default values pass `new`'s validation.
144 #[must_use]
145 pub fn default_bounded() -> Self {
146 Self::new(1.0, DEFAULT_DELTA, DEFAULT_WINDOW_CAP).expect("default params valid")
147 }
148
149 /// Current window length.
150 #[must_use]
151 pub fn len(&self) -> usize {
152 self.buffer.len()
153 }
154
155 /// `true` when the window holds no observations.
156 #[must_use]
157 pub fn is_empty(&self) -> bool {
158 self.buffer.is_empty()
159 }
160
161 /// Lifetime drift fires since construction.
162 #[must_use]
163 pub fn drift_fires(&self) -> u64 {
164 self.drift_fires
165 }
166
167 /// Running mean of the current window (empty → `0.0`).
168 #[must_use]
169 pub fn mean(&self) -> f64 {
170 if self.buffer.is_empty() {
171 return 0.0;
172 }
173 #[allow(clippy::cast_precision_loss)]
174 let n = self.buffer.len() as f64;
175 self.buffer.iter().sum::<f64>() / n
176 }
177
178 /// Fold `value` into the window. Evaluates every split point
179 /// against the Hoeffding bound; if any split flags drift, the
180 /// older sub-window is dropped and the return is `true`.
181 /// Returns `false` otherwise (no drift on this update).
182 ///
183 /// Non-finite values are silently dropped.
184 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
185 pub fn update(&mut self, value: f64) -> bool {
186 if !value.is_finite() {
187 return false;
188 }
189 self.metrics.inc_counter(names::ADWIN_OBSERVED_TOTAL, 1);
190 // Drop the oldest entry if we're at cap. Keeping this an
191 // `O(N)` front-removal is fine at N ≤ 4k — benchmarked
192 // marginal vs `VecDeque` in the typical anomstream-core use case.
193 if self.buffer.len() >= self.window_cap {
194 self.buffer.remove(0);
195 }
196 self.buffer.push(value);
197 let fired = self.detect_and_shrink();
198 if fired {
199 self.metrics.inc_counter(names::ADWIN_DRIFT_FIRES_TOTAL, 1);
200 }
201 fired
202 }
203
204 /// Drop every observation. Counters are preserved.
205 pub fn reset_window(&mut self) {
206 self.buffer.clear();
207 }
208
209 /// Scan every candidate split; drop the older sub-window on
210 /// the first drift signal found. Returns `true` iff drift was
211 /// detected.
212 fn detect_and_shrink(&mut self) -> bool {
213 let n = self.buffer.len();
214 if n < 2 * MIN_SUBWINDOW_LEN {
215 return false;
216 }
217 // Prefix sums for O(N) split evaluation.
218 let mut prefix = Vec::with_capacity(n + 1);
219 prefix.push(0.0_f64);
220 let mut acc = 0.0_f64;
221 for &v in &self.buffer {
222 acc += v;
223 prefix.push(acc);
224 }
225 let total = prefix[n];
226
227 for (split, &left_sum) in prefix
228 .iter()
229 .enumerate()
230 .take(n - MIN_SUBWINDOW_LEN + 1)
231 .skip(MIN_SUBWINDOW_LEN)
232 {
233 let n_left = split;
234 let n_right = n - split;
235 #[allow(clippy::cast_precision_loss)]
236 let nl = n_left as f64;
237 #[allow(clippy::cast_precision_loss)]
238 let nr = n_right as f64;
239 let mean_l = left_sum / nl;
240 let mean_r = (total - left_sum) / nr;
241 let m = 1.0 / (1.0 / nl + 1.0 / nr);
242 #[allow(clippy::cast_precision_loss)]
243 let log_term = (4.0 * n as f64 / self.delta).ln();
244 let eps_cut = self.range * (log_term / (2.0 * m)).sqrt();
245 if (mean_l - mean_r).abs() > eps_cut {
246 // Drop the older sub-window — keep the right (newer)
247 // side which is assumed to reflect the new regime.
248 self.buffer.drain(..split);
249 self.drift_fires = self.drift_fires.saturating_add(1);
250 return true;
251 }
252 }
253 false
254 }
255}
256
257#[cfg(test)]
258#[allow(
259 clippy::unwrap_used,
260 clippy::panic,
261 clippy::float_cmp,
262 clippy::cast_precision_loss
263)]
264mod tests {
265 use super::*;
266
267 #[test]
268 fn new_rejects_invalid_params() {
269 assert!(AdwinDetector::new(-1.0, DEFAULT_DELTA, DEFAULT_WINDOW_CAP).is_err());
270 assert!(AdwinDetector::new(1.0, 0.0, DEFAULT_WINDOW_CAP).is_err());
271 assert!(AdwinDetector::new(1.0, 1.0, DEFAULT_WINDOW_CAP).is_err());
272 assert!(AdwinDetector::new(1.0, DEFAULT_DELTA, 5).is_err());
273 }
274
275 #[test]
276 fn stable_stream_does_not_fire() {
277 let mut d = AdwinDetector::default_bounded();
278 for _ in 0..512 {
279 assert!(!d.update(0.5));
280 }
281 assert_eq!(d.drift_fires(), 0);
282 }
283
284 #[test]
285 fn mean_shift_triggers_drift() {
286 let mut d = AdwinDetector::default_bounded();
287 // Fill with baseline mean 0.1.
288 for _ in 0..256 {
289 let _ = d.update(0.1);
290 }
291 // Shift to mean 0.9 — should trigger within a handful of
292 // post-shift samples.
293 let mut fired = false;
294 for _ in 0..128 {
295 if d.update(0.9) {
296 fired = true;
297 break;
298 }
299 }
300 assert!(fired, "ADWIN missed mean shift from 0.1 → 0.9");
301 assert!(d.drift_fires() >= 1);
302 }
303
304 #[test]
305 fn non_finite_is_ignored() {
306 let mut d = AdwinDetector::default_bounded();
307 assert!(!d.update(f64::NAN));
308 assert!(!d.update(f64::INFINITY));
309 assert_eq!(d.len(), 0);
310 }
311
312 #[test]
313 fn window_respects_cap() {
314 let mut d = AdwinDetector::new(1.0, DEFAULT_DELTA, 64).unwrap();
315 for i in 0..200 {
316 let _ = d.update(f64::from(i % 2));
317 }
318 assert!(d.len() <= 64);
319 }
320
321 #[test]
322 fn reset_window_clears_buffer() {
323 let mut d = AdwinDetector::default_bounded();
324 for _ in 0..100 {
325 let _ = d.update(0.5);
326 }
327 d.reset_window();
328 assert!(d.is_empty());
329 }
330}