Skip to main content

anomstream_core/
per_feature_cusum.rs

1//! Per-feature two-sided CUSUM change-point detector.
2//!
3//! `D` parallel univariate CUSUMs track positive and negative
4//! cumulative sums of the deviation from a reference mean.
5//! Alerts when either side exceeds the threshold `h` — detects
6//! *sustained* mean shifts that an `EWMA` adapts to and stops
7//! reporting (e.g. slow-ramp `DDoS`, gradual leak).
8//!
9//! Complementary to [`crate::meta_drift::MetaDriftDetector`]
10//! (scalar CUSUM on the RCF score stream): this module is
11//! per-feature CUSUM on raw observations, so caller can answer
12//! *which* feature drifted and in which direction.
13//!
14//! # CUSUM recurrence
15//!
16//! ```text
17//! S+ ← max(0, S+ + (x − μ₀ − k))
18//! S− ← max(0, S− − (x − μ₀ + k))
19//! alert when S+ > h  (increase)  or  S− > h  (decrease)
20//! ```
21//!
22//! `k` is the slack (allowable drift), `h` is the threshold,
23//! `μ₀` is the reference mean (auto-learned on the first
24//! observation unless overridden via [`PerFeatureCusum::set_reference`]).
25//!
26//! # References
27//!
28//! 1. E. S. Page, "Continuous Inspection Schemes",
29//!    *Biometrika* 41, 1954.
30//! 2. D. M. Hawkins & D. H. Olwell, *Cumulative Sum Charts and
31//!    Charting for Quality Improvement*, Springer, 1998.
32
33use alloc::vec::Vec;
34
35/// Direction of a detected change-point drift.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
38#[non_exhaustive]
39pub enum DriftDirection {
40    /// Sustained increase above the reference mean.
41    Increase,
42    /// Sustained decrease below the reference mean.
43    Decrease,
44}
45
46/// One CUSUM alert — fired when a feature's positive or
47/// negative cumulative sum exceeds the threshold.
48#[derive(Debug, Clone, Copy, PartialEq)]
49#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
50pub struct PerFeatureCusumAlert {
51    /// Feature index that tripped (0-based into the observation
52    /// array).
53    pub feature_index: usize,
54    /// Which side of the two-sided chart fired.
55    pub direction: DriftDirection,
56    /// `max(S+, S−)` at the moment of the alert.
57    pub magnitude: f64,
58    /// Consecutive samples the drift has been building.
59    pub duration_samples: u64,
60}
61
62/// One univariate two-sided CUSUM accumulator.
63#[derive(Debug, Clone, Copy, PartialEq)]
64#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
65#[cfg_attr(
66    feature = "serde",
67    serde(try_from = "PerFeatureCusumAccumulatorShadow")
68)]
69pub struct PerFeatureCusumAccumulator {
70    /// Positive cumulative sum (detects increases).
71    pub s_pos: f64,
72    /// Negative cumulative sum (detects decreases).
73    pub s_neg: f64,
74    /// Reference mean `μ₀` (auto-learned or caller-set).
75    pub reference: f64,
76    /// Whether `reference` has been populated.
77    pub reference_set: bool,
78    /// Consecutive samples the current drift has been
79    /// accumulating.
80    pub drift_samples: u64,
81}
82
83/// Over-the-wire [`PerFeatureCusumAccumulator`] layout.
84/// Deserialization lands here first so [`TryFrom`] can reject
85/// `NaN` / `±inf` poisoning of the cumulative sums or the
86/// reference mean — fields that later feed the `update()`
87/// recurrence and would propagate non-finite state indefinitely.
88#[cfg(feature = "serde")]
89#[derive(serde::Serialize, serde::Deserialize)]
90#[allow(clippy::missing_docs_in_private_items)]
91struct PerFeatureCusumAccumulatorShadow {
92    s_pos: f64,
93    s_neg: f64,
94    reference: f64,
95    reference_set: bool,
96    drift_samples: u64,
97}
98
99#[cfg(feature = "serde")]
100impl TryFrom<PerFeatureCusumAccumulatorShadow> for PerFeatureCusumAccumulator {
101    type Error = crate::error::RcfError;
102
103    fn try_from(raw: PerFeatureCusumAccumulatorShadow) -> Result<Self, Self::Error> {
104        if !raw.s_pos.is_finite() || !raw.s_neg.is_finite() || !raw.reference.is_finite() {
105            return Err(crate::error::RcfError::InvalidConfig(alloc::format!(
106                "PerFeatureCusumAccumulator: non-finite field (s_pos={}, s_neg={}, reference={})",
107                raw.s_pos,
108                raw.s_neg,
109                raw.reference
110            ).into()));
111        }
112        if raw.s_pos < 0.0 || raw.s_neg < 0.0 {
113            return Err(crate::error::RcfError::InvalidConfig(alloc::format!(
114                "PerFeatureCusumAccumulator: cumulative sums must be non-negative (s_pos={}, s_neg={})",
115                raw.s_pos,
116                raw.s_neg
117            ).into()));
118        }
119        Ok(Self {
120            s_pos: raw.s_pos,
121            s_neg: raw.s_neg,
122            reference: raw.reference,
123            reference_set: raw.reference_set,
124            drift_samples: raw.drift_samples,
125        })
126    }
127}
128
129impl PerFeatureCusumAccumulator {
130    /// Fresh accumulator — zeroed, reference unset.
131    #[must_use]
132    pub const fn new() -> Self {
133        Self {
134            s_pos: 0.0,
135            s_neg: 0.0,
136            reference: 0.0,
137            reference_set: false,
138            drift_samples: 0,
139        }
140    }
141
142    /// Reset to the zero state.
143    pub fn reset(&mut self) {
144        *self = Self::new();
145    }
146
147    /// Current magnitude — `max(S+, S−)`. Used by the caller
148    /// to report a per-feature score even when no alert fired.
149    #[must_use]
150    pub fn magnitude(&self) -> f64 {
151        self.s_pos.max(self.s_neg)
152    }
153
154    /// Ingest `value` and return an alert when either side
155    /// exceeds `threshold`. First call seeds `reference = value`
156    /// and returns `None` unconditionally.
157    pub fn update(
158        &mut self,
159        value: f64,
160        slack: f64,
161        threshold: f64,
162        feature_index: usize,
163    ) -> Option<PerFeatureCusumAlert> {
164        if !self.reference_set {
165            self.reference = value;
166            self.reference_set = true;
167            return None;
168        }
169
170        self.s_pos = (self.s_pos + (value - self.reference - slack)).max(0.0);
171        self.s_neg = (self.s_neg - (value - self.reference + slack)).max(0.0);
172
173        if self.s_pos > threshold || self.s_neg > threshold {
174            self.drift_samples += 1;
175            let (direction, magnitude) = if self.s_pos > self.s_neg {
176                (DriftDirection::Increase, self.s_pos)
177            } else {
178                (DriftDirection::Decrease, self.s_neg)
179            };
180            Some(PerFeatureCusumAlert {
181                feature_index,
182                direction,
183                magnitude,
184                duration_samples: self.drift_samples,
185            })
186        } else {
187            self.drift_samples = 0;
188            None
189        }
190    }
191}
192
193impl Default for PerFeatureCusumAccumulator {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199/// Hyper-parameters for [`PerFeatureCusum`].
200#[derive(Debug, Clone, Copy, PartialEq)]
201#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
202#[cfg_attr(feature = "serde", serde(try_from = "PerFeatureCusumConfigShadow"))]
203pub struct PerFeatureCusumConfig {
204    /// Slack `k` — allowable drift before accumulation starts.
205    /// Typical `0.5·σ` of the reference signal.
206    pub slack: f64,
207    /// Threshold `h` — cumulative sum at which an alert fires.
208    /// Typical `4·σ` of the reference signal.
209    pub threshold: f64,
210}
211
212/// Over-the-wire [`PerFeatureCusumConfig`] layout.
213/// Deserialization lands here first so [`TryFrom`] can enforce
214/// finite, non-negative `slack` and strictly-positive `threshold`
215/// — an attacker-supplied `threshold ≤ 0` would make every
216/// observation emit an alert.
217#[cfg(feature = "serde")]
218#[derive(serde::Serialize, serde::Deserialize)]
219#[allow(clippy::missing_docs_in_private_items)]
220struct PerFeatureCusumConfigShadow {
221    slack: f64,
222    threshold: f64,
223}
224
225#[cfg(feature = "serde")]
226impl TryFrom<PerFeatureCusumConfigShadow> for PerFeatureCusumConfig {
227    type Error = crate::error::RcfError;
228
229    fn try_from(raw: PerFeatureCusumConfigShadow) -> Result<Self, Self::Error> {
230        if !raw.slack.is_finite() || raw.slack < 0.0 {
231            return Err(crate::error::RcfError::InvalidConfig(
232                alloc::format!(
233                    "PerFeatureCusumConfig: slack must be finite and ≥ 0, got {}",
234                    raw.slack
235                )
236                .into(),
237            ));
238        }
239        if !raw.threshold.is_finite() || raw.threshold <= 0.0 {
240            return Err(crate::error::RcfError::InvalidConfig(
241                alloc::format!(
242                    "PerFeatureCusumConfig: threshold must be finite and > 0, got {}",
243                    raw.threshold
244                )
245                .into(),
246            ));
247        }
248        Ok(Self {
249            slack: raw.slack,
250            threshold: raw.threshold,
251        })
252    }
253}
254
255impl Default for PerFeatureCusumConfig {
256    fn default() -> Self {
257        Self {
258            slack: 0.5,
259            threshold: 5.0,
260        }
261    }
262}
263
264/// Result of one [`PerFeatureCusum::observe`] call.
265#[derive(Debug, Clone)]
266#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
267pub struct PerFeatureCusumResult<const D: usize> {
268    /// `max(S+, S−)` per feature at the moment the observation
269    /// returned — includes the current update.
270    #[cfg_attr(feature = "serde", serde(with = "crate::serde_util::fixed_array_f64"))]
271    pub per_feature_magnitude: [f64; D],
272    /// `max(per_feature_magnitude)` — single-number summary.
273    pub max_magnitude: f64,
274    /// Alerts fired this tick (one per feature that exceeded
275    /// `threshold`).
276    pub alerts: Vec<PerFeatureCusumAlert>,
277}
278
279/// `D` parallel two-sided CUSUMs sharing one `(slack, threshold)`
280/// configuration.
281///
282/// # Examples
283///
284/// ```
285/// use anomstream_core::{PerFeatureCusum, PerFeatureCusumConfig};
286///
287/// let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
288///     slack: 0.5,
289///     threshold: 5.0,
290/// });
291/// det.observe(&[100.0, 200.0]); // seeds references
292/// for _ in 0..20 {
293///     det.observe(&[105.0, 200.0]);
294/// }
295/// ```
296#[derive(Debug, Clone)]
297#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
298pub struct PerFeatureCusum<const D: usize> {
299    /// Per-dimension accumulator state.
300    #[cfg_attr(feature = "serde", serde(with = "serde_accumulators"))]
301    accumulators: [PerFeatureCusumAccumulator; D],
302    /// Active configuration.
303    config: PerFeatureCusumConfig,
304    /// Observations ingested so far.
305    total_samples: u64,
306}
307
308impl<const D: usize> PerFeatureCusum<D> {
309    /// Build an empty detector.
310    #[must_use]
311    pub const fn new(config: PerFeatureCusumConfig) -> Self {
312        Self {
313            accumulators: [PerFeatureCusumAccumulator::new(); D],
314            config,
315            total_samples: 0,
316        }
317    }
318
319    /// Active configuration.
320    #[must_use]
321    pub const fn config(&self) -> &PerFeatureCusumConfig {
322        &self.config
323    }
324
325    /// Observations ingested so far.
326    #[must_use]
327    pub const fn total_samples(&self) -> u64 {
328        self.total_samples
329    }
330
331    /// Per-dimension accumulator snapshot (read-only).
332    #[must_use]
333    pub const fn accumulators(&self) -> &[PerFeatureCusumAccumulator; D] {
334        &self.accumulators
335    }
336
337    /// Count of features currently in an active drift
338    /// (`drift_samples > 0`).
339    #[must_use]
340    pub fn active_drifts(&self) -> usize {
341        self.accumulators
342            .iter()
343            .filter(|a| a.drift_samples > 0)
344            .count()
345    }
346
347    /// Override the auto-learned reference mean per dimension.
348    /// Useful when feeding a stable external baseline (e.g. an
349    /// EWMA mean) rather than the first observation.
350    pub fn set_reference(&mut self, means: &[f64; D]) {
351        for (acc, &mean) in self.accumulators.iter_mut().zip(means.iter()) {
352            acc.reference = mean;
353            acc.reference_set = true;
354        }
355    }
356
357    /// Ingest `input`, returning per-feature magnitudes and any
358    /// alerts that fired. Always updates the accumulators.
359    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
360    pub fn observe(&mut self, input: &[f64; D]) -> PerFeatureCusumResult<D> {
361        let mut per_feature_magnitude = [0.0_f64; D];
362        let mut alerts: Vec<PerFeatureCusumAlert> = Vec::new();
363
364        for (i, &value) in input.iter().enumerate() {
365            let pre_magnitude = self.accumulators[i].magnitude();
366            per_feature_magnitude[i] = pre_magnitude;
367
368            if let Some(alert) =
369                self.accumulators[i].update(value, self.config.slack, self.config.threshold, i)
370            {
371                per_feature_magnitude[i] = alert.magnitude;
372                alerts.push(alert);
373            }
374        }
375
376        self.total_samples += 1;
377        let max_magnitude = per_feature_magnitude
378            .iter()
379            .copied()
380            .fold(0.0_f64, f64::max);
381
382        PerFeatureCusumResult {
383            per_feature_magnitude,
384            max_magnitude,
385            alerts,
386        }
387    }
388
389    /// Zero every accumulator and the sample counter.
390    pub fn reset(&mut self) {
391        for acc in &mut self.accumulators {
392            acc.reset();
393        }
394        self.total_samples = 0;
395    }
396}
397
398#[cfg(feature = "serde")]
399mod serde_accumulators {
400    //! `serde` adapter for `[PerFeatureCusumAccumulator; D]` —
401    //! derive macro does not cover arbitrary-`D` arrays.
402    use super::PerFeatureCusumAccumulator;
403    use alloc::vec::Vec;
404    use serde::{Deserialize, Deserializer, Serialize, Serializer};
405
406    /// Serialize `[PerFeatureCusumAccumulator; D]` as a length-prefixed slice.
407    pub fn serialize<S: Serializer, const D: usize>(
408        accs: &[PerFeatureCusumAccumulator; D],
409        s: S,
410    ) -> Result<S::Ok, S::Error> {
411        accs.as_slice().serialize(s)
412    }
413
414    /// Deserialize a length-prefixed slice back into `[PerFeatureCusumAccumulator; D]`.
415    pub fn deserialize<'de, DSer: Deserializer<'de>, const D: usize>(
416        d: DSer,
417    ) -> Result<[PerFeatureCusumAccumulator; D], DSer::Error> {
418        let v: Vec<PerFeatureCusumAccumulator> = Vec::deserialize(d)?;
419        if v.len() != D {
420            return Err(serde::de::Error::invalid_length(
421                v.len(),
422                &"expected D accumulators",
423            ));
424        }
425        let mut out = [PerFeatureCusumAccumulator::new(); D];
426        for (slot, acc) in out.iter_mut().zip(v) {
427            *slot = acc;
428        }
429        Ok(out)
430    }
431}
432
433#[cfg(test)]
434#[allow(clippy::float_cmp)]
435mod tests {
436    use super::*;
437
438    #[test]
439    fn first_observation_seeds_reference() {
440        let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
441            slack: 0.5,
442            threshold: 5.0,
443        });
444        let out = det.observe(&[100.0]);
445        assert!(out.alerts.is_empty());
446        assert!(det.accumulators()[0].reference_set);
447        assert_eq!(det.accumulators()[0].reference, 100.0);
448    }
449
450    #[test]
451    fn no_alert_on_stable_signal() {
452        let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
453            slack: 0.5,
454            threshold: 5.0,
455        });
456        for _ in 0..100 {
457            let out = det.observe(&[100.0]);
458            assert!(out.alerts.is_empty());
459        }
460    }
461
462    #[test]
463    fn detects_upward_ramp() {
464        let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
465            slack: 0.5,
466            threshold: 5.0,
467        });
468        let _ = det.observe(&[100.0]);
469        let mut alerted = false;
470        for _ in 0..20 {
471            let out = det.observe(&[105.0]);
472            if let Some(alert) = out.alerts.first() {
473                assert_eq!(alert.direction, DriftDirection::Increase);
474                assert_eq!(alert.feature_index, 0);
475                alerted = true;
476                break;
477            }
478        }
479        assert!(alerted);
480    }
481
482    #[test]
483    fn detects_downward_ramp() {
484        let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
485            slack: 0.5,
486            threshold: 5.0,
487        });
488        let _ = det.observe(&[100.0]);
489        let mut alerted = false;
490        for _ in 0..20 {
491            let out = det.observe(&[95.0]);
492            if let Some(alert) = out.alerts.first() {
493                assert_eq!(alert.direction, DriftDirection::Decrease);
494                alerted = true;
495                break;
496            }
497        }
498        assert!(alerted);
499    }
500
501    #[test]
502    fn drift_samples_counter_grows_then_resets() {
503        let mut det = PerFeatureCusum::<1>::new(PerFeatureCusumConfig {
504            slack: 0.5,
505            threshold: 5.0,
506        });
507        let _ = det.observe(&[100.0]);
508        for _ in 0..20 {
509            let _ = det.observe(&[105.0]);
510        }
511        assert!(det.accumulators()[0].drift_samples > 0);
512        // Return to reference — S+ decays by `slack` per tick.
513        // 20 steps of +4.5 each ≈ S+=90 at trip; 200 steps of
514        // −0.5 brings it back below threshold (5).
515        for _ in 0..250 {
516            let _ = det.observe(&[100.0]);
517        }
518        assert_eq!(det.accumulators()[0].drift_samples, 0);
519    }
520
521    #[test]
522    fn set_reference_overrides_auto_learn() {
523        let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
524            slack: 0.5,
525            threshold: 5.0,
526        });
527        det.set_reference(&[50.0, 100.0]);
528        assert!(det.accumulators()[0].reference_set);
529        assert_eq!(det.accumulators()[0].reference, 50.0);
530
531        // Feeding at the reference must not trigger alerts.
532        for _ in 0..50 {
533            let out = det.observe(&[50.0, 100.0]);
534            assert!(out.alerts.is_empty());
535        }
536    }
537
538    #[test]
539    fn max_magnitude_picks_largest_feature() {
540        let mut det = PerFeatureCusum::<3>::new(PerFeatureCusumConfig {
541            slack: 0.5,
542            threshold: 5.0,
543        });
544        let _ = det.observe(&[0.0, 0.0, 0.0]);
545        for _ in 0..20 {
546            let _ = det.observe(&[0.0, 10.0, 0.0]);
547        }
548        let out = det.observe(&[0.0, 10.0, 0.0]);
549        assert_eq!(out.max_magnitude, out.per_feature_magnitude[1]);
550        assert!(out.per_feature_magnitude[1] > out.per_feature_magnitude[0]);
551        assert!(out.per_feature_magnitude[1] > out.per_feature_magnitude[2]);
552    }
553
554    #[test]
555    fn reset_clears_state() {
556        let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
557            slack: 0.5,
558            threshold: 5.0,
559        });
560        let _ = det.observe(&[100.0, 200.0]);
561        for _ in 0..20 {
562            let _ = det.observe(&[110.0, 220.0]);
563        }
564        assert!(det.active_drifts() > 0);
565        det.reset();
566        assert_eq!(det.total_samples(), 0);
567        assert_eq!(det.active_drifts(), 0);
568        for acc in det.accumulators() {
569            assert!(!acc.reference_set);
570            assert_eq!(acc.s_pos, 0.0);
571            assert_eq!(acc.s_neg, 0.0);
572        }
573    }
574
575    #[test]
576    fn active_drifts_counts_per_feature() {
577        let mut det = PerFeatureCusum::<2>::new(PerFeatureCusumConfig {
578            slack: 0.5,
579            threshold: 5.0,
580        });
581        let _ = det.observe(&[100.0, 100.0]);
582        for _ in 0..20 {
583            let _ = det.observe(&[110.0, 100.0]);
584        }
585        // Only feature 0 is drifting.
586        assert_eq!(det.active_drifts(), 1);
587    }
588
589    #[cfg(all(feature = "serde", feature = "postcard"))]
590    #[test]
591    fn postcard_roundtrip_preserves_state() {
592        let mut det = PerFeatureCusum::<3>::new(PerFeatureCusumConfig {
593            slack: 0.5,
594            threshold: 5.0,
595        });
596        let _ = det.observe(&[100.0, 200.0, 300.0]);
597        for _ in 0..10 {
598            let _ = det.observe(&[105.0, 200.0, 300.0]);
599        }
600        let bytes = postcard::to_allocvec(&det).expect("serde ok");
601        let back: PerFeatureCusum<3> = postcard::from_bytes(&bytes).expect("serde ok");
602        assert_eq!(back.total_samples(), det.total_samples());
603        assert_eq!(back.accumulators()[0].s_pos, det.accumulators()[0].s_pos);
604        assert_eq!(
605            back.accumulators()[0].reference,
606            det.accumulators()[0].reference
607        );
608    }
609
610    #[cfg(all(feature = "serde", feature = "postcard"))]
611    #[test]
612    fn deserialize_rejects_nan_in_accumulator() {
613        let bad = PerFeatureCusumAccumulatorShadow {
614            s_pos: f64::NAN,
615            s_neg: 0.0,
616            reference: 0.0,
617            reference_set: true,
618            drift_samples: 0,
619        };
620        let bytes = postcard::to_allocvec(&bad).unwrap();
621        let back: Result<PerFeatureCusumAccumulator, _> = postcard::from_bytes(&bytes);
622        assert!(back.is_err());
623    }
624
625    #[cfg(all(feature = "serde", feature = "postcard"))]
626    #[test]
627    fn deserialize_rejects_negative_cumsum() {
628        let bad = PerFeatureCusumAccumulatorShadow {
629            s_pos: -1.0,
630            s_neg: 0.0,
631            reference: 0.0,
632            reference_set: true,
633            drift_samples: 0,
634        };
635        let bytes = postcard::to_allocvec(&bad).unwrap();
636        let back: Result<PerFeatureCusumAccumulator, _> = postcard::from_bytes(&bytes);
637        assert!(back.is_err());
638    }
639
640    #[cfg(all(feature = "serde", feature = "postcard"))]
641    #[test]
642    fn deserialize_rejects_non_positive_threshold() {
643        let bad = PerFeatureCusumConfigShadow {
644            slack: 0.5,
645            threshold: 0.0,
646        };
647        let bytes = postcard::to_allocvec(&bad).unwrap();
648        let back: Result<PerFeatureCusumConfig, _> = postcard::from_bytes(&bytes);
649        assert!(back.is_err());
650    }
651
652    #[cfg(all(feature = "serde", feature = "postcard"))]
653    #[test]
654    fn deserialize_rejects_nan_threshold() {
655        let bad = PerFeatureCusumConfigShadow {
656            slack: 0.5,
657            threshold: f64::NAN,
658        };
659        let bytes = postcard::to_allocvec(&bad).unwrap();
660        let back: Result<PerFeatureCusumConfig, _> = postcard::from_bytes(&bytes);
661        assert!(back.is_err());
662    }
663}