Skip to main content

anomstream_core/
shingled.rs

1//! Internal shingling on top of [`crate::RandomCutForest`].
2//!
3//! Turns a **scalar stream** into a `D`-dim feature vector by keeping
4//! the last `D` observations in a ring buffer. Each new scalar shifts
5//! the window and emits a fresh `[f64; D]` to the forest. Isolation-
6//! depth scoring on the shingled view captures **temporal
7//! autocorrelation** that bare scalar scoring cannot — a dwell
8//! anomaly at constant rate (NAB `rogue_agent_key_hold`) does not
9//! expand the forest's bounding box on the raw scalar, but on the
10//! shingled vector the anomalous subsequence sits far from the
11//! baseline subsequences in the `D`-dim shingle space.
12//!
13//! Matches the shape of AWS Java's `RotateShingle` (random cut forest
14//! with internal ring buffer). This module is the RCF-side fix for
15//! the `rogue_agent_key_hold` = 0.145 / `SWaT` = 0.282 failures
16//! documented in `docs/performance.md`.
17//!
18//! # Build
19//!
20//! ```ignore
21//! use anomstream_core::ShingledForestBuilder;
22//!
23//! let mut forest = ShingledForestBuilder::<32>::new()
24//!     .num_trees(100)
25//!     .sample_size(256)
26//!     .seed(2026)
27//!     .build()?;
28//!
29//! for sample in stream_of_scalars {
30//!     if forest.update_scalar(sample)? {
31//!         let score = forest.score_scalar(sample)?;
32//!         if f64::from(score) > 1.5 { eprintln!("contextual anomaly"); }
33//!     }
34//! }
35//! ```
36//!
37//! # Shingled embedding shape
38//!
39//! For shingle size `D`, the emitted vector is
40//! `[v_{t-D+1}, …, v_{t-1}, v_t]` — oldest-first, newest-last. The
41//! ring buffer pre-loads on the first `D - 1` scalars; `update_scalar`
42//! returns `false` during warm-up and `true` once the forest received
43//! its first sample.
44//!
45//! # When to z-score
46//!
47//! Scaling is the caller's job. For NDR feature dims with wildly
48//! different magnitudes (packet-rate, entropy, port-count), z-score
49//! each scalar against its warm-phase `(mean, stddev)` **before**
50//! handing it to [`ShingledForest::update_scalar`] — RCF cuts are
51//! range-weighted, un-normalised scalars let whichever dim carries
52//! the biggest range dominate every cut.
53
54#![cfg(feature = "std")]
55
56use crate::domain::{AnomalyScore, DiVector};
57use crate::error::{RcfError, RcfResult};
58use crate::forest::RandomCutForest;
59use crate::{ForestBuilder, RcfConfig};
60
61/// Builder producing a [`ShingledForest`]. Delegates every RCF
62/// hyperparameter to [`ForestBuilder`] — the only extra is the
63/// compile-time shingle size which equals the forest
64/// dimensionality `D`.
65///
66/// The const-generic `D` **is** the shingle size: one shingled
67/// vector = last `D` scalars.
68#[derive(Debug)]
69pub struct ShingledForestBuilder<const D: usize> {
70    /// Underlying bare-forest builder — full passthrough of every
71    /// tuning knob.
72    inner: ForestBuilder<D>,
73}
74
75impl<const D: usize> Default for ShingledForestBuilder<D> {
76    fn default() -> Self {
77        Self::new()
78    }
79}
80
81impl<const D: usize> ShingledForestBuilder<D> {
82    /// Start a fresh builder with the bare-forest defaults.
83    #[must_use]
84    pub fn new() -> Self {
85        Self {
86            inner: ForestBuilder::<D>::new(),
87        }
88    }
89
90    /// Number of trees — forwarded to [`ForestBuilder::num_trees`].
91    #[must_use]
92    pub fn num_trees(mut self, trees: usize) -> Self {
93        self.inner = self.inner.num_trees(trees);
94        self
95    }
96
97    /// Sample size — forwarded to [`ForestBuilder::sample_size`].
98    #[must_use]
99    pub fn sample_size(mut self, sample: usize) -> Self {
100        self.inner = self.inner.sample_size(sample);
101        self
102    }
103
104    /// Master seed — forwarded to [`ForestBuilder::seed`].
105    #[must_use]
106    pub fn seed(mut self, seed: u64) -> Self {
107        self.inner = self.inner.seed(seed);
108        self
109    }
110
111    /// Time-decay — forwarded to [`ForestBuilder::time_decay`].
112    #[must_use]
113    pub fn time_decay(mut self, decay: f64) -> Self {
114        self.inner = self.inner.time_decay(decay);
115        self
116    }
117
118    /// Fetch the resolved [`RcfConfig`] that [`Self::build`] would
119    /// use — mirrors [`ForestBuilder::config`].
120    #[must_use]
121    pub fn config(&self) -> &RcfConfig {
122        self.inner.config()
123    }
124
125    /// Build the shingled forest. Fails exactly when the underlying
126    /// [`ForestBuilder::build`] fails.
127    ///
128    /// # Errors
129    ///
130    /// Propagates [`ForestBuilder::build`] errors.
131    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
132    pub fn build(self) -> RcfResult<ShingledForest<D>> {
133        let forest = self.inner.build()?;
134        Ok(ShingledForest {
135            forest,
136            ring: [0.0_f64; D],
137            filled: 0,
138            cursor: 0,
139            warmed: false,
140        })
141    }
142}
143
144/// `D`-dim shingled wrapper over [`RandomCutForest`] — scalar-stream
145/// input, internal ring buffer of the last `D` samples.
146///
147/// The ring buffer is stored oldest-to-newest logically but laid out
148/// as a **circular array** internally — constant-time update with no
149/// allocation. [`ShingledForest::current_shingle`] exposes the
150/// logical shingle in read-only form (oldest-first) for diagnostics.
151pub struct ShingledForest<const D: usize> {
152    /// Wrapped bare forest operating on shingled `[f64; D]` points.
153    forest: RandomCutForest<D>,
154    /// Circular storage for the last `D` scalars. `cursor` points
155    /// to the slot that will be overwritten on the next update.
156    ring: [f64; D],
157    /// Scalars received since construction / last `reset` — saturates
158    /// at `D`, used by [`Self::is_warmed`].
159    filled: usize,
160    /// Next write position in `ring`.
161    cursor: usize,
162    /// `true` once at least one full shingle has been submitted to
163    /// the forest.
164    warmed: bool,
165}
166
167impl<const D: usize> core::fmt::Debug for ShingledForest<D> {
168    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
169        f.debug_struct("ShingledForest")
170            .field("shingle_size", &D)
171            .field("filled", &self.filled)
172            .field("warmed", &self.warmed)
173            .finish_non_exhaustive()
174    }
175}
176
177impl<const D: usize> ShingledForest<D> {
178    /// Shingle size (equals the compile-time `D`).
179    #[must_use]
180    pub const fn shingle_size(&self) -> usize {
181        D
182    }
183
184    /// Whether the ring buffer holds a full `D`-scalar window and
185    /// the forest has received at least one shingle.
186    #[must_use]
187    pub const fn is_warmed(&self) -> bool {
188        self.warmed
189    }
190
191    /// Immutable view of the underlying bare forest — use this to
192    /// inspect tree state, read metrics, or route through the
193    /// [`RandomCutForest::forensic_baseline`] / `attribution`
194    /// helpers on the already-shingled last point.
195    #[must_use]
196    pub fn forest(&self) -> &RandomCutForest<D> {
197        &self.forest
198    }
199
200    /// Mutable escape hatch — handy for bootstrap replay
201    /// ([`RandomCutForest::bootstrap`]) when the caller has
202    /// pre-shingled their warm-up corpus.
203    pub fn forest_mut(&mut self) -> &mut RandomCutForest<D> {
204        &mut self.forest
205    }
206
207    /// Snapshot the current shingle in logical order (oldest-first).
208    /// Returns `None` while the ring is still partially empty.
209    #[must_use]
210    pub fn current_shingle(&self) -> Option<[f64; D]> {
211        if self.filled < D {
212            return None;
213        }
214        Some(self.materialise_shingle())
215    }
216
217    /// Fold `value` into the ring buffer; once the ring is full,
218    /// forward the shingled window to the forest. Returns `true`
219    /// when the shingle was submitted to the forest (i.e. the ring
220    /// was full before the call).
221    ///
222    /// # Errors
223    ///
224    /// - [`RcfError::NaNValue`] on non-finite `value`.
225    /// - Propagates [`RandomCutForest::update`] failures once the
226    ///   shingle is submitted.
227    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
228    pub fn update_scalar(&mut self, value: f64) -> RcfResult<bool> {
229        if !value.is_finite() {
230            return Err(RcfError::NaNValue);
231        }
232        // Submit the *previous* shingle before rotating the ring —
233        // the new scalar becomes the newest entry of the shingle
234        // seen by the forest next call.
235        let submitted = if self.filled >= D {
236            let shingle = self.materialise_shingle();
237            self.forest.update(shingle)?;
238            self.warmed = true;
239            true
240        } else {
241            false
242        };
243        // Rotate the ring.
244        self.ring[self.cursor] = value;
245        self.cursor = (self.cursor + 1) % D;
246        if self.filled < D {
247            self.filled += 1;
248        }
249        Ok(submitted)
250    }
251
252    /// Score `value` against the frozen forest **without** folding
253    /// it into the ring buffer. The query uses the current shingle
254    /// with `value` appended as the newest slot — matches what a
255    /// subsequent [`Self::update_scalar`] would submit.
256    ///
257    /// # Errors
258    ///
259    /// - [`RcfError::NaNValue`] on non-finite `value`.
260    /// - [`RcfError::EmptyForest`] before the ring buffer is full
261    ///   or the forest has not yet received its first update.
262    /// - Propagates [`RandomCutForest::score`] failures.
263    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
264    pub fn score_scalar(&self, value: f64) -> RcfResult<AnomalyScore> {
265        if !value.is_finite() {
266            return Err(RcfError::NaNValue);
267        }
268        let shingle = self.shingle_with(value)?;
269        self.forest.score(&shingle)
270    }
271
272    /// Attribution on the shingle formed by appending `value` to
273    /// the current ring. Returns a `D`-dim [`DiVector`] where each
274    /// dim is a **lag index** (0 = oldest, `D-1` = newest / `value`).
275    ///
276    /// # Errors
277    ///
278    /// Same as [`Self::score_scalar`].
279    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
280    pub fn attribution_scalar(&self, value: f64) -> RcfResult<DiVector> {
281        if !value.is_finite() {
282            return Err(RcfError::NaNValue);
283        }
284        let shingle = self.shingle_with(value)?;
285        self.forest.attribution(&shingle)
286    }
287
288    /// Stateless codisp on the shingle formed with `value` appended.
289    /// Non-mutating — preserves the frozen-baseline contract across
290    /// long streams. Prefer this over the mutating `score_codisp`
291    /// path for shingled forensic replay.
292    ///
293    /// # Errors
294    ///
295    /// Same as [`Self::score_scalar`].
296    pub fn score_codisp_stateless_scalar(&self, value: f64) -> RcfResult<AnomalyScore> {
297        if !value.is_finite() {
298            return Err(RcfError::NaNValue);
299        }
300        let shingle = self.shingle_with(value)?;
301        self.forest.score_codisp_stateless(&shingle)
302    }
303
304    /// Drop the ring buffer and reset the warm-up flag; the
305    /// underlying forest is **not** reset — callers who want a
306    /// full state wipe should rebuild.
307    pub fn reset_ring(&mut self) {
308        self.ring = [0.0_f64; D];
309        self.filled = 0;
310        self.cursor = 0;
311        self.warmed = false;
312    }
313
314    /// Logical oldest-first materialisation of the ring.
315    fn materialise_shingle(&self) -> [f64; D] {
316        let mut out = [0.0_f64; D];
317        // `cursor` points at the slot that will be overwritten
318        // next, which equals the position of the *oldest* entry
319        // when the ring is full.
320        for (i, slot) in out.iter_mut().enumerate() {
321            *slot = self.ring[(self.cursor + i) % D];
322        }
323        out
324    }
325
326    /// Build the shingle that would result from appending `value`
327    /// to the current ring, without mutating the ring.
328    fn shingle_with(&self, value: f64) -> RcfResult<[f64; D]> {
329        if self.filled < D {
330            return Err(RcfError::EmptyForest);
331        }
332        let mut out = [0.0_f64; D];
333        // Drop the oldest entry (at `cursor`), shift the rest left
334        // by one, append `value` as newest.
335        for (i, slot) in out.iter_mut().enumerate().take(D - 1) {
336            *slot = self.ring[(self.cursor + 1 + i) % D];
337        }
338        out[D - 1] = value;
339        Ok(out)
340    }
341}
342
343#[cfg(test)]
344#[allow(
345    clippy::unwrap_used,
346    clippy::panic,
347    clippy::float_cmp,
348    clippy::cast_lossless,
349    clippy::cast_precision_loss
350)]
351mod tests {
352    use super::*;
353
354    fn small() -> ShingledForest<4> {
355        ShingledForestBuilder::<4>::new()
356            .num_trees(50)
357            .sample_size(64)
358            .seed(2026)
359            .build()
360            .unwrap()
361    }
362
363    #[test]
364    fn warm_up_requires_d_scalars() {
365        let mut f = small();
366        for i in 0..3 {
367            let submitted = f.update_scalar(i as f64).unwrap();
368            assert!(!submitted, "shouldn't submit before ring is full");
369            assert!(!f.is_warmed());
370        }
371        // 4th scalar fills the ring but the next update is when the
372        // first *shingle* lands in the forest.
373        let submitted = f.update_scalar(3.0).unwrap();
374        assert!(!submitted);
375        assert_eq!(f.current_shingle(), Some([0.0, 1.0, 2.0, 3.0]));
376        // 5th scalar — now previous shingle [0,1,2,3] gets submitted.
377        let submitted = f.update_scalar(4.0).unwrap();
378        assert!(submitted);
379        assert!(f.is_warmed());
380        assert_eq!(f.current_shingle(), Some([1.0, 2.0, 3.0, 4.0]));
381    }
382
383    #[test]
384    fn update_scalar_rejects_nan() {
385        let mut f = small();
386        assert!(matches!(
387            f.update_scalar(f64::NAN).unwrap_err(),
388            RcfError::NaNValue
389        ));
390        assert!(matches!(
391            f.update_scalar(f64::INFINITY).unwrap_err(),
392            RcfError::NaNValue
393        ));
394    }
395
396    #[test]
397    fn score_before_warm_fails() {
398        let f = small();
399        assert!(matches!(
400            f.score_scalar(1.0).unwrap_err(),
401            RcfError::EmptyForest
402        ));
403    }
404
405    #[test]
406    fn score_after_warm_returns_non_negative() {
407        let mut f = small();
408        for i in 0..200 {
409            let _ = f.update_scalar(i as f64 * 0.01).unwrap();
410        }
411        let s: f64 = f.score_scalar(10.0).unwrap().into();
412        assert!(s.is_finite());
413        assert!(s >= 0.0);
414    }
415
416    #[test]
417    fn outlier_scalar_scores_higher_than_in_cluster() {
418        let mut f = ShingledForestBuilder::<8>::new()
419            .num_trees(100)
420            .sample_size(128)
421            .seed(7)
422            .build()
423            .unwrap();
424        // Warm on a tight cluster.
425        let mut tick = 0.0_f64;
426        for _ in 0..1_000 {
427            let _ = f.update_scalar((tick.sin() + 1.0) * 0.1).unwrap();
428            tick += 0.1;
429        }
430        // In-cluster probe.
431        let normal: f64 = f.score_scalar(0.10).unwrap().into();
432        // Outlier probe.
433        let outlier: f64 = f.score_scalar(100.0).unwrap().into();
434        assert!(
435            outlier > normal,
436            "outlier {outlier} should exceed in-cluster {normal}"
437        );
438    }
439
440    #[test]
441    fn shingle_with_does_not_mutate_ring() {
442        let mut f = small();
443        for i in 0..5 {
444            let _ = f.update_scalar(i as f64).unwrap();
445        }
446        let before = f.current_shingle().unwrap();
447        let _ = f.score_scalar(99.0).unwrap();
448        let after = f.current_shingle().unwrap();
449        assert_eq!(before, after);
450    }
451
452    #[test]
453    fn reset_ring_clears_warm_state_but_preserves_forest() {
454        let mut f = small();
455        for i in 0..10 {
456            let _ = f.update_scalar(i as f64).unwrap();
457        }
458        assert!(f.is_warmed());
459        f.reset_ring();
460        assert!(!f.is_warmed());
461        assert_eq!(f.current_shingle(), None);
462        // Forest still holds its leaves — a fresh shingle submission
463        // after re-warming should score against the prior baseline.
464        for i in 0..10 {
465            let _ = f.update_scalar(i as f64).unwrap();
466        }
467        let s: f64 = f.score_scalar(100.0).unwrap().into();
468        assert!(s.is_finite());
469    }
470
471    #[test]
472    fn codisp_stateless_on_shingle_matches_bare_forest() {
473        let mut f = small();
474        for i in 0..50 {
475            let _ = f.update_scalar(i as f64 * 0.01).unwrap();
476        }
477        let scalar_codisp: f64 = f.score_codisp_stateless_scalar(5.0).unwrap().into();
478        let shingle = f.shingle_with(5.0).unwrap();
479        let direct: f64 = f.forest().score_codisp_stateless(&shingle).unwrap().into();
480        assert!((scalar_codisp - direct).abs() < 1.0e-12);
481    }
482}