Skip to main content

anomstream_core/
meta_drift.rs

1//! CUSUM change-point detector over the anomaly-score stream.
2//!
3//! The per-point detectors in this crate ([`crate::RandomCutForest`]
4//! / [`crate::ThresholdedForest`]) answer *"is this point
5//! anomalous?"*. [`MetaDriftDetector`] answers the orthogonal
6//! second-order question *"is the distribution of anomaly scores
7//! itself shifting?"* by running a two-sided CUSUM (Page 1954) over
8//! the score stream.
9//!
10//! The distinction matters operationally:
11//!
12//! - a **single high score** is an individual anomaly — a potential
13//!   incident to triage;
14//! - a **sustained upward trend** in scores is baseline drift — the
15//!   threshold needs to adapt, not an incident to page;
16//! - a **sustained downward trend** is a quiescence event or a
17//!   detector becoming less sensitive over time.
18//!
19//! The existing [`crate::ThresholdedForest`] already tracks the
20//! score stream's EMA mean and variance — but CUSUM is strictly more
21//! sensitive than a `μ + zσ` gate for *small* persistent shifts,
22//! which is the signature of drift. CUSUM fires on many consecutive
23//! mild-deviation scores where a sigma-band test misses entirely.
24//!
25//! # Algorithm
26//!
27//! Two-sided tabular CUSUM, parameterised by:
28//!
29//! - `allowance_k` (slack in σ units, default `0.5`): how much
30//!   expected noise the detector absorbs before a shift starts
31//!   accumulating.
32//! - `threshold_h` (detection bound in σ units, default `5.0`): how
33//!   far the accumulator can drift from zero before firing. Higher
34//!   `h` = slower to fire, fewer false positives.
35//!
36//! ```text
37//! S⁺[t] = max(0, S⁺[t−1] + (x[t] − μ) − k·σ)   // upward-shift accumulator
38//! S⁻[t] = max(0, S⁻[t−1] + (μ − x[t]) − k·σ)   // downward-shift accumulator
39//! fire  = S⁺ > h·σ (upward) | S⁻ > h·σ (downward)
40//! ```
41//!
42//! `μ` and `σ` are maintained by the built-in [`EmaStats`] the
43//! detector owns — the CUSUM update uses the *previous* values so a
44//! fresh observation cannot influence the reference it is compared
45//! against.
46//!
47//! # Reset protocol
48//!
49//! Callers that react to a drift event should invoke
50//! [`MetaDriftDetector::reset`] afterwards so the CUSUM accumulators
51//! start over from zero. The EMA stats are kept — they *are* the new
52//! reference the post-drift distribution will be measured against.
53//! Use [`MetaDriftDetector::reset_stats`] to discard both the
54//! accumulators and the EMA, e.g. after a config change or a major
55//! regime switch.
56
57use alloc::format;
58
59use crate::error::{RcfError, RcfResult};
60use crate::thresholded::EmaStats;
61
62/// Default slack (`k`) in σ units. The CUSUM absorbs noise up to
63/// `k · stddev` before a shift starts accumulating.
64pub const DEFAULT_ALLOWANCE_K: f64 = 0.5;
65/// Default detection bound (`h`) in σ units. The CUSUM accumulator
66/// must exceed `h · stddev` before firing.
67pub const DEFAULT_THRESHOLD_H: f64 = 5.0;
68/// Default minimum observations before emitting a non-warmup verdict.
69pub const DEFAULT_MIN_OBSERVATIONS: u64 = 32;
70/// Default EMA smoothing factor on the score stream.
71pub const DEFAULT_DECAY: f64 = 0.01;
72
73/// Direction of a detected drift.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
76#[non_exhaustive]
77pub enum DriftKind {
78    /// The score stream shifted upward — sustained higher anomaly
79    /// scores than the running mean. Typical in baseline drift where
80    /// traffic gets slightly "weirder" over time.
81    Upward,
82    /// The score stream shifted downward — sustained lower anomaly
83    /// scores. Typical when the detector has adapted to what was
84    /// previously anomalous, or when traffic quiesces.
85    Downward,
86}
87
88/// Validated configuration of the CUSUM layer.
89#[derive(Debug, Clone, Copy, PartialEq)]
90#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
91pub struct CusumConfig {
92    /// Slack in σ units before a shift starts accumulating.
93    pub allowance_k: f64,
94    /// Detection bound in σ units.
95    pub threshold_h: f64,
96    /// Samples required before the detector emits a non-warmup
97    /// verdict.
98    pub min_observations: u64,
99    /// EMA smoothing factor on the score stream. Must be in `(0, 1]`.
100    pub decay: f64,
101}
102
103impl Default for CusumConfig {
104    fn default() -> Self {
105        Self {
106            allowance_k: DEFAULT_ALLOWANCE_K,
107            threshold_h: DEFAULT_THRESHOLD_H,
108            min_observations: DEFAULT_MIN_OBSERVATIONS,
109            decay: DEFAULT_DECAY,
110        }
111    }
112}
113
114impl CusumConfig {
115    /// Validate every parameter.
116    ///
117    /// # Errors
118    ///
119    /// Returns [`RcfError::InvalidConfig`] when any field is outside
120    /// its accepted range: `allowance_k` and `threshold_h` must be
121    /// finite and non-negative (`allowance_k == 0` is legal — no
122    /// slack), `decay` must be finite and in `(0, 1]`.
123    pub fn validate(&self) -> RcfResult<()> {
124        if !self.allowance_k.is_finite() || self.allowance_k < 0.0 {
125            return Err(RcfError::InvalidConfig(
126                format!(
127                    "allowance_k must be finite and >= 0, got {}",
128                    self.allowance_k
129                )
130                .into(),
131            ));
132        }
133        if !self.threshold_h.is_finite() || self.threshold_h <= 0.0 {
134            return Err(RcfError::InvalidConfig(
135                format!(
136                    "threshold_h must be finite and > 0, got {}",
137                    self.threshold_h
138                )
139                .into(),
140            ));
141        }
142        if !self.decay.is_finite() || self.decay <= 0.0 || self.decay > 1.0 {
143            return Err(RcfError::InvalidConfig(
144                format!("decay must be in (0.0, 1.0], got {}", self.decay).into(),
145            ));
146        }
147        Ok(())
148    }
149}
150
151/// Verdict emitted by a single [`MetaDriftDetector::observe`] call.
152#[derive(Debug, Clone, Copy, PartialEq)]
153#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
154pub struct DriftVerdict {
155    /// Upward CUSUM accumulator `S⁺` after the observation.
156    pub s_high: f64,
157    /// Downward CUSUM accumulator `S⁻` after the observation.
158    pub s_low: f64,
159    /// Absolute detection threshold in effect (`threshold_h · stddev`).
160    pub threshold: f64,
161    /// EMA mean of the score stream before the observation was folded
162    /// in — the reference the CUSUM compared against.
163    pub mean: f64,
164    /// EMA stddev of the score stream before the observation.
165    pub stddev: f64,
166    /// Whether the detector had enough observations to emit a
167    /// meaningful verdict (`observations >= min_observations` and
168    /// `stddev > 0`). When `false`, [`Self::drift`] is always `None`.
169    pub ready: bool,
170    /// Direction of the drift, or `None` when neither accumulator
171    /// exceeded the threshold.
172    pub drift: Option<DriftKind>,
173}
174
175/// CUSUM drift detector.
176///
177/// Feed anomaly scores through [`Self::observe`] one by one; the
178/// detector returns a [`DriftVerdict`] every time. Reset the
179/// accumulators via [`Self::reset`] once the caller has acted on a
180/// drift event; reset the whole state (including the reference EMA)
181/// via [`Self::reset_stats`] after a config change.
182#[derive(Debug, Clone)]
183#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
184pub struct MetaDriftDetector {
185    /// Validated CUSUM config.
186    config: CusumConfig,
187    /// Running EMA of the score stream used for μ / σ.
188    stats: EmaStats,
189    /// Upward cumulative sum accumulator.
190    s_high: f64,
191    /// Downward cumulative sum accumulator.
192    s_low: f64,
193    /// Observability sink — emits the CUSUM accumulators + fire
194    /// counter. Defaults to [`crate::NoopSink`].
195    #[cfg(feature = "std")]
196    #[cfg_attr(
197        feature = "serde",
198        serde(skip, default = "crate::metrics::default_sink")
199    )]
200    metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
201}
202
203impl MetaDriftDetector {
204    /// Build a detector with the supplied config.
205    ///
206    /// # Errors
207    ///
208    /// Forwards [`CusumConfig::validate`] and [`EmaStats::new`]
209    /// errors.
210    pub fn new(config: CusumConfig) -> RcfResult<Self> {
211        config.validate()?;
212        let stats = EmaStats::new(config.decay)?;
213        Ok(Self {
214            config,
215            stats,
216            s_high: 0.0,
217            s_low: 0.0,
218            #[cfg(feature = "std")]
219            metrics: crate::metrics::default_sink(),
220        })
221    }
222
223    /// Install a [`crate::MetricsSink`] — emits
224    /// `rcf_drift_s_high` / `rcf_drift_s_low` histograms per call
225    /// and increments `rcf_drift_fires_total` on fire verdicts.
226    #[cfg(feature = "std")]
227    #[must_use]
228    pub fn with_metrics_sink(
229        mut self,
230        sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
231    ) -> Self {
232        self.metrics = sink;
233        self
234    }
235
236    /// Read-only handle to the installed sink.
237    #[cfg(feature = "std")]
238    #[must_use]
239    pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
240        &self.metrics
241    }
242
243    /// Default-configured detector.
244    ///
245    /// # Errors
246    ///
247    /// Same as [`Self::new`].
248    pub fn with_defaults() -> RcfResult<Self> {
249        Self::new(CusumConfig::default())
250    }
251
252    /// Read-only CUSUM config.
253    #[must_use]
254    pub fn config(&self) -> &CusumConfig {
255        &self.config
256    }
257
258    /// Running EMA of the score stream.
259    #[must_use]
260    pub fn stats(&self) -> &EmaStats {
261        &self.stats
262    }
263
264    /// Upward cumulative sum accumulator.
265    #[must_use]
266    pub fn s_high(&self) -> f64 {
267        self.s_high
268    }
269
270    /// Downward cumulative sum accumulator.
271    #[must_use]
272    pub fn s_low(&self) -> f64 {
273        self.s_low
274    }
275
276    /// Fold `score` into the detector and emit a verdict.
277    ///
278    /// Non-finite inputs are silently ignored and return a verdict
279    /// flagged as not-ready without mutating the detector.
280    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
281    pub fn observe(&mut self, score: f64) -> DriftVerdict {
282        if !score.is_finite() {
283            return DriftVerdict {
284                s_high: self.s_high,
285                s_low: self.s_low,
286                threshold: 0.0,
287                mean: self.stats.mean(),
288                stddev: self.stats.stddev(),
289                ready: false,
290                drift: None,
291            };
292        }
293
294        let prev_mean = self.stats.mean();
295        let prev_stddev = self.stats.stddev();
296        let prev_observations = self.stats.observations();
297        self.stats.update(score);
298
299        let ready = prev_observations >= self.config.min_observations && prev_stddev > 0.0;
300        if !ready {
301            return DriftVerdict {
302                s_high: self.s_high,
303                s_low: self.s_low,
304                threshold: 0.0,
305                mean: prev_mean,
306                stddev: prev_stddev,
307                ready: false,
308                drift: None,
309            };
310        }
311
312        let k = self.config.allowance_k * prev_stddev;
313        let h = self.config.threshold_h * prev_stddev;
314        let dev = score - prev_mean;
315
316        self.s_high = (self.s_high + dev - k).max(0.0);
317        self.s_low = (self.s_low - dev - k).max(0.0);
318
319        let drift = if self.s_high > h {
320            Some(DriftKind::Upward)
321        } else if self.s_low > h {
322            Some(DriftKind::Downward)
323        } else {
324            None
325        };
326
327        #[cfg(feature = "std")]
328        {
329            use crate::metrics::names;
330            self.metrics
331                .observe_histogram(names::DRIFT_S_HIGH, self.s_high);
332            self.metrics
333                .observe_histogram(names::DRIFT_S_LOW, self.s_low);
334            match drift {
335                Some(DriftKind::Upward) => {
336                    self.metrics.inc_counter(names::DRIFT_FIRES_TOTAL, 1);
337                    self.metrics.inc_counter(names::DRIFT_UP_TOTAL, 1);
338                }
339                Some(DriftKind::Downward) => {
340                    self.metrics.inc_counter(names::DRIFT_FIRES_TOTAL, 1);
341                    self.metrics.inc_counter(names::DRIFT_DOWN_TOTAL, 1);
342                }
343                None => {}
344            }
345        }
346
347        DriftVerdict {
348            s_high: self.s_high,
349            s_low: self.s_low,
350            threshold: h,
351            mean: prev_mean,
352            stddev: prev_stddev,
353            ready: true,
354            drift,
355        }
356    }
357
358    /// Clear the CUSUM accumulators while keeping the EMA reference
359    /// intact. Call this after reacting to a drift event — the new
360    /// distribution becomes the next reference.
361    pub fn reset(&mut self) {
362        self.s_high = 0.0;
363        self.s_low = 0.0;
364    }
365
366    /// Clear everything — accumulators and EMA reference. Returns
367    /// the detector to its warmup state.
368    pub fn reset_stats(&mut self) {
369        self.s_high = 0.0;
370        self.s_low = 0.0;
371        self.stats.reset();
372    }
373}
374
375#[cfg(test)]
376#[allow(clippy::float_cmp)] // Tests assert closed-form CUSUM behaviour.
377mod tests {
378    use super::*;
379
380    fn detector(h: f64) -> MetaDriftDetector {
381        MetaDriftDetector::new(CusumConfig {
382            allowance_k: 0.5,
383            threshold_h: h,
384            min_observations: 8,
385            decay: 0.1,
386        })
387        .unwrap()
388    }
389
390    #[test]
391    fn default_config_validates() {
392        CusumConfig::default().validate().unwrap();
393    }
394
395    fn cfg(k: f64, h: f64, min_obs: u64, decay: f64) -> CusumConfig {
396        CusumConfig {
397            allowance_k: k,
398            threshold_h: h,
399            min_observations: min_obs,
400            decay,
401        }
402    }
403
404    #[test]
405    fn validate_rejects_negative_allowance_k() {
406        assert!(
407            cfg(-0.1, DEFAULT_THRESHOLD_H, 8, DEFAULT_DECAY)
408                .validate()
409                .is_err()
410        );
411    }
412
413    #[test]
414    fn validate_rejects_zero_threshold_h() {
415        assert!(
416            cfg(DEFAULT_ALLOWANCE_K, 0.0, 8, DEFAULT_DECAY)
417                .validate()
418                .is_err()
419        );
420    }
421
422    #[test]
423    fn validate_rejects_decay_outside_range() {
424        assert!(
425            cfg(DEFAULT_ALLOWANCE_K, DEFAULT_THRESHOLD_H, 8, 0.0)
426                .validate()
427                .is_err()
428        );
429        assert!(
430            cfg(DEFAULT_ALLOWANCE_K, DEFAULT_THRESHOLD_H, 8, 1.5)
431                .validate()
432                .is_err()
433        );
434        assert!(
435            cfg(DEFAULT_ALLOWANCE_K, DEFAULT_THRESHOLD_H, 8, f64::NAN)
436                .validate()
437                .is_err()
438        );
439    }
440
441    #[test]
442    fn warmup_never_fires() {
443        let mut d = detector(5.0);
444        for _ in 0..8 {
445            let v = d.observe(1.0);
446            assert!(!v.ready);
447            assert!(v.drift.is_none());
448        }
449    }
450
451    #[test]
452    fn constant_stream_does_not_fire() {
453        // A perfectly constant stream has stddev → 0; the detector
454        // should never leave the warming-up state.
455        let mut d = detector(5.0);
456        for _ in 0..200 {
457            let v = d.observe(1.0);
458            assert!(v.drift.is_none());
459        }
460        // Accumulators stay at 0.
461        assert_eq!(d.s_high(), 0.0);
462        assert_eq!(d.s_low(), 0.0);
463    }
464
465    #[test]
466    fn upward_shift_fires_upward() {
467        let mut d = detector(3.0);
468        // Warmup: noisy baseline.
469        for i in 0..64 {
470            let noise = if i % 2 == 0 { 0.95 } else { 1.05 };
471            let _ = d.observe(noise);
472        }
473        // Sustained shift upward.
474        let mut saw_upward = false;
475        for _ in 0..100 {
476            let v = d.observe(5.0);
477            if matches!(v.drift, Some(DriftKind::Upward)) {
478                saw_upward = true;
479                break;
480            }
481        }
482        assert!(saw_upward, "CUSUM should fire upward on sustained shift");
483    }
484
485    #[test]
486    fn downward_shift_fires_downward() {
487        let mut d = detector(3.0);
488        for i in 0..64 {
489            let noise = if i % 2 == 0 { 4.95 } else { 5.05 };
490            let _ = d.observe(noise);
491        }
492        let mut saw_downward = false;
493        for _ in 0..100 {
494            let v = d.observe(1.0);
495            if matches!(v.drift, Some(DriftKind::Downward)) {
496                saw_downward = true;
497                break;
498            }
499        }
500        assert!(
501            saw_downward,
502            "CUSUM should fire downward on sustained shift"
503        );
504    }
505
506    #[test]
507    fn non_finite_input_ignored() {
508        let mut d = detector(3.0);
509        for _ in 0..16 {
510            let _ = d.observe(1.0);
511        }
512        let obs_before = d.stats().observations();
513        let v_nan = d.observe(f64::NAN);
514        let v_inf = d.observe(f64::INFINITY);
515        assert!(v_nan.drift.is_none());
516        assert!(v_inf.drift.is_none());
517        assert_eq!(d.stats().observations(), obs_before);
518    }
519
520    #[test]
521    fn reset_clears_accumulators_but_keeps_stats() {
522        let mut d = detector(3.0);
523        for i in 0..64 {
524            let noise = if i % 2 == 0 { 0.95 } else { 1.05 };
525            let _ = d.observe(noise);
526        }
527        for _ in 0..50 {
528            let _ = d.observe(5.0);
529        }
530        assert!(d.s_high() > 0.0);
531        let stats_obs = d.stats().observations();
532        d.reset();
533        assert_eq!(d.s_high(), 0.0);
534        assert_eq!(d.s_low(), 0.0);
535        assert_eq!(
536            d.stats().observations(),
537            stats_obs,
538            "reset() must keep the EMA reference"
539        );
540    }
541
542    #[test]
543    fn reset_stats_clears_everything() {
544        let mut d = detector(3.0);
545        for _ in 0..64 {
546            let _ = d.observe(1.0);
547        }
548        d.reset_stats();
549        assert_eq!(d.s_high(), 0.0);
550        assert_eq!(d.s_low(), 0.0);
551        assert_eq!(d.stats().observations(), 0);
552    }
553
554    #[test]
555    fn verdict_exposes_reference_mean_and_stddev() {
556        let mut d = detector(5.0);
557        for _ in 0..32 {
558            let _ = d.observe(2.0);
559        }
560        let v = d.observe(2.5);
561        // After many `2.0` inputs the EMA should be near 2.0; the
562        // verdict's mean/stddev are the *pre-update* values so they
563        // reflect the reference the CUSUM compared against.
564        assert!((v.mean - 2.0).abs() < 0.5);
565        assert!(v.stddev >= 0.0);
566    }
567
568    #[test]
569    fn with_defaults_builds() {
570        let d = MetaDriftDetector::with_defaults().unwrap();
571        assert_eq!(d.config().allowance_k, DEFAULT_ALLOWANCE_K);
572        assert_eq!(d.config().threshold_h, DEFAULT_THRESHOLD_H);
573    }
574}