Skip to main content

anomstream_core/
adwin.rs

1//! `ADWIN` (`ADaptive` `WINdowing`) — streaming change-point
2//! detector with automatic window sizing.
3//!
4//! Bifet & Gavaldà, *Learning from Time-Changing Data with
5//! Adaptive Windowing*, SIAM SDM 2007. The window holds recent
6//! observations; whenever the mean over two adjacent sub-windows
7//! differs by more than a Hoeffding-style bound, the older
8//! sub-window is dropped. The remaining window is the longest
9//! suffix still consistent with a single distribution.
10//!
11//! This implementation is a bounded-capacity simplification of
12//! Bifet's ADWIN2 (no exponential histograms): a ring buffer of
13//! the last `N` items + prefix-sum scans on each `update`. O(N)
14//! per update where `N` is the configured window cap. For the
15//! use-case in `anomstream-core` (drift on score streams, `N ≤ 4096`) the
16//! constant factors dominate over the logarithmic win of
17//! exponential histograms.
18//!
19//! # Drift statistic
20//!
21//! For every split point `i ∈ [1, N - 1]`:
22//!
23//! ```text
24//! mean_L = (1/n_L) · Σ_{j < i} x_j
25//! mean_R = (1/n_R) · Σ_{j ≥ i} x_j
26//! m      = 1 / (1/n_L + 1/n_R)                  (harmonic mean of sizes)
27//! ε_cut  = sqrt((1 / (2m)) · ln(4 · N / δ)) · (range)
28//! ```
29//!
30//! Drift fires when `|mean_L − mean_R| > ε_cut` for any split.
31//! `range` is the caller-declared amplitude of the stream (for
32//! bounded-range streams the Hoeffding constant collapses to 1).
33//!
34//! # Use with anomstream-core
35//!
36//! `AdwinDetector` is a standalone trigger — feed it the anomaly
37//! score stream (or any scalar per-step signal) and route
38//! [`AdwinDetector::update`]'s `true` return into
39//! [`crate::DriftAwareForest::on_drift`] to spawn a shadow forest.
40
41#![cfg(feature = "std")]
42
43use std::sync::Arc;
44
45use crate::error::{RcfError, RcfResult};
46use crate::metrics::{MetricsSink, default_sink, names};
47
48/// Default confidence budget `δ` — lower values = stricter bound,
49/// fewer false-positive drift fires. `0.002` matches Bifet's
50/// reference `δ = 0.002` experiment setting.
51pub const DEFAULT_DELTA: f64 = 0.002;
52
53/// Default window cap (bounded ring buffer). `4096` handles hour-
54/// scale streams at 1 observation/s without excess memory.
55pub const DEFAULT_WINDOW_CAP: usize = 4096;
56
57/// Minimum sub-window size required for a valid Hoeffding-bound
58/// comparison. Below this the detector stays silent — sub-window
59/// means are too noisy to distinguish drift from sampling jitter.
60pub const MIN_SUBWINDOW_LEN: usize = 16;
61
62/// ADWIN-style streaming change-point detector.
63#[derive(Debug, Clone)]
64#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
65pub struct AdwinDetector {
66    /// Caller-declared amplitude of the observed stream. Scales
67    /// the Hoeffding bound — bounded `[0, 1]` streams pass `1.0`.
68    range: f64,
69    /// Confidence budget — lower δ => stricter threshold.
70    delta: f64,
71    /// Maximum items kept in the window.
72    window_cap: usize,
73    /// Oldest-first ring of recent observations.
74    buffer: Vec<f64>,
75    /// Cumulative drift fires reported.
76    drift_fires: u64,
77    /// Observability sink — serde-skipped, restored to the noop
78    /// sink on round-trip.
79    #[cfg_attr(
80        feature = "serde",
81        serde(skip, default = "crate::metrics::default_sink")
82    )]
83    metrics: Arc<dyn MetricsSink>,
84}
85
86impl AdwinDetector {
87    /// Build a detector with caller-chosen stream amplitude.
88    ///
89    /// # Errors
90    ///
91    /// Returns [`RcfError::InvalidConfig`] on non-finite / non-
92    /// positive `range`, `delta` outside `(0, 1)`, or
93    /// `window_cap < 2 · MIN_SUBWINDOW_LEN`.
94    pub fn new(range: f64, delta: f64, window_cap: usize) -> RcfResult<Self> {
95        if !range.is_finite() || range <= 0.0 {
96            return Err(RcfError::InvalidConfig(
97                format!("AdwinDetector: range must be finite and > 0, got {range}").into(),
98            ));
99        }
100        if !delta.is_finite() || !(0.0..1.0).contains(&delta) || delta <= 0.0 {
101            return Err(RcfError::InvalidConfig(
102                format!("AdwinDetector: delta must be in (0.0, 1.0), got {delta}").into(),
103            ));
104        }
105        if window_cap < 2 * MIN_SUBWINDOW_LEN {
106            return Err(RcfError::InvalidConfig(
107                format!(
108                    "AdwinDetector: window_cap must be >= {}, got {window_cap}",
109                    2 * MIN_SUBWINDOW_LEN
110                )
111                .into(),
112            ));
113        }
114        Ok(Self {
115            range,
116            delta,
117            window_cap,
118            buffer: Vec::with_capacity(window_cap),
119            drift_fires: 0,
120            metrics: default_sink(),
121        })
122    }
123
124    /// Install a metrics sink — every `update` emits an observed
125    /// counter, and every drift fire bumps a drift-fires counter.
126    #[must_use]
127    pub fn with_metrics_sink(mut self, sink: Arc<dyn MetricsSink>) -> Self {
128        self.metrics = sink;
129        self
130    }
131
132    /// Read-only handle to the installed sink.
133    #[must_use]
134    pub fn metrics_sink(&self) -> &Arc<dyn MetricsSink> {
135        &self.metrics
136    }
137
138    /// Convenience: range `1.0`, [`DEFAULT_DELTA`],
139    /// [`DEFAULT_WINDOW_CAP`].
140    ///
141    /// # Panics
142    ///
143    /// Never — the default values pass `new`'s validation.
144    #[must_use]
145    pub fn default_bounded() -> Self {
146        Self::new(1.0, DEFAULT_DELTA, DEFAULT_WINDOW_CAP).expect("default params valid")
147    }
148
149    /// Current window length.
150    #[must_use]
151    pub fn len(&self) -> usize {
152        self.buffer.len()
153    }
154
155    /// `true` when the window holds no observations.
156    #[must_use]
157    pub fn is_empty(&self) -> bool {
158        self.buffer.is_empty()
159    }
160
161    /// Lifetime drift fires since construction.
162    #[must_use]
163    pub fn drift_fires(&self) -> u64 {
164        self.drift_fires
165    }
166
167    /// Running mean of the current window (empty → `0.0`).
168    #[must_use]
169    pub fn mean(&self) -> f64 {
170        if self.buffer.is_empty() {
171            return 0.0;
172        }
173        #[allow(clippy::cast_precision_loss)]
174        let n = self.buffer.len() as f64;
175        self.buffer.iter().sum::<f64>() / n
176    }
177
178    /// Fold `value` into the window. Evaluates every split point
179    /// against the Hoeffding bound; if any split flags drift, the
180    /// older sub-window is dropped and the return is `true`.
181    /// Returns `false` otherwise (no drift on this update).
182    ///
183    /// Non-finite values are silently dropped.
184    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
185    pub fn update(&mut self, value: f64) -> bool {
186        if !value.is_finite() {
187            return false;
188        }
189        self.metrics.inc_counter(names::ADWIN_OBSERVED_TOTAL, 1);
190        // Drop the oldest entry if we're at cap. Keeping this an
191        // `O(N)` front-removal is fine at N ≤ 4k — benchmarked
192        // marginal vs `VecDeque` in the typical anomstream-core use case.
193        if self.buffer.len() >= self.window_cap {
194            self.buffer.remove(0);
195        }
196        self.buffer.push(value);
197        let fired = self.detect_and_shrink();
198        if fired {
199            self.metrics.inc_counter(names::ADWIN_DRIFT_FIRES_TOTAL, 1);
200        }
201        fired
202    }
203
204    /// Drop every observation. Counters are preserved.
205    pub fn reset_window(&mut self) {
206        self.buffer.clear();
207    }
208
209    /// Scan every candidate split; drop the older sub-window on
210    /// the first drift signal found. Returns `true` iff drift was
211    /// detected.
212    fn detect_and_shrink(&mut self) -> bool {
213        let n = self.buffer.len();
214        if n < 2 * MIN_SUBWINDOW_LEN {
215            return false;
216        }
217        // Prefix sums for O(N) split evaluation.
218        let mut prefix = Vec::with_capacity(n + 1);
219        prefix.push(0.0_f64);
220        let mut acc = 0.0_f64;
221        for &v in &self.buffer {
222            acc += v;
223            prefix.push(acc);
224        }
225        let total = prefix[n];
226
227        for (split, &left_sum) in prefix
228            .iter()
229            .enumerate()
230            .take(n - MIN_SUBWINDOW_LEN + 1)
231            .skip(MIN_SUBWINDOW_LEN)
232        {
233            let n_left = split;
234            let n_right = n - split;
235            #[allow(clippy::cast_precision_loss)]
236            let nl = n_left as f64;
237            #[allow(clippy::cast_precision_loss)]
238            let nr = n_right as f64;
239            let mean_l = left_sum / nl;
240            let mean_r = (total - left_sum) / nr;
241            let m = 1.0 / (1.0 / nl + 1.0 / nr);
242            #[allow(clippy::cast_precision_loss)]
243            let log_term = (4.0 * n as f64 / self.delta).ln();
244            let eps_cut = self.range * (log_term / (2.0 * m)).sqrt();
245            if (mean_l - mean_r).abs() > eps_cut {
246                // Drop the older sub-window — keep the right (newer)
247                // side which is assumed to reflect the new regime.
248                self.buffer.drain(..split);
249                self.drift_fires = self.drift_fires.saturating_add(1);
250                return true;
251            }
252        }
253        false
254    }
255}
256
257#[cfg(test)]
258#[allow(
259    clippy::unwrap_used,
260    clippy::panic,
261    clippy::float_cmp,
262    clippy::cast_precision_loss
263)]
264mod tests {
265    use super::*;
266
267    #[test]
268    fn new_rejects_invalid_params() {
269        assert!(AdwinDetector::new(-1.0, DEFAULT_DELTA, DEFAULT_WINDOW_CAP).is_err());
270        assert!(AdwinDetector::new(1.0, 0.0, DEFAULT_WINDOW_CAP).is_err());
271        assert!(AdwinDetector::new(1.0, 1.0, DEFAULT_WINDOW_CAP).is_err());
272        assert!(AdwinDetector::new(1.0, DEFAULT_DELTA, 5).is_err());
273    }
274
275    #[test]
276    fn stable_stream_does_not_fire() {
277        let mut d = AdwinDetector::default_bounded();
278        for _ in 0..512 {
279            assert!(!d.update(0.5));
280        }
281        assert_eq!(d.drift_fires(), 0);
282    }
283
284    #[test]
285    fn mean_shift_triggers_drift() {
286        let mut d = AdwinDetector::default_bounded();
287        // Fill with baseline mean 0.1.
288        for _ in 0..256 {
289            let _ = d.update(0.1);
290        }
291        // Shift to mean 0.9 — should trigger within a handful of
292        // post-shift samples.
293        let mut fired = false;
294        for _ in 0..128 {
295            if d.update(0.9) {
296                fired = true;
297                break;
298            }
299        }
300        assert!(fired, "ADWIN missed mean shift from 0.1 → 0.9");
301        assert!(d.drift_fires() >= 1);
302    }
303
304    #[test]
305    fn non_finite_is_ignored() {
306        let mut d = AdwinDetector::default_bounded();
307        assert!(!d.update(f64::NAN));
308        assert!(!d.update(f64::INFINITY));
309        assert_eq!(d.len(), 0);
310    }
311
312    #[test]
313    fn window_respects_cap() {
314        let mut d = AdwinDetector::new(1.0, DEFAULT_DELTA, 64).unwrap();
315        for i in 0..200 {
316            let _ = d.update(f64::from(i % 2));
317        }
318        assert!(d.len() <= 64);
319    }
320
321    #[test]
322    fn reset_window_clears_buffer() {
323        let mut d = AdwinDetector::default_bounded();
324        for _ in 0..100 {
325            let _ = d.update(0.5);
326        }
327        d.reset_window();
328        assert!(d.is_empty());
329    }
330}