anomstream_core/shingled.rs
1//! Internal shingling on top of [`crate::RandomCutForest`].
2//!
3//! Turns a **scalar stream** into a `D`-dim feature vector by keeping
4//! the last `D` observations in a ring buffer. Each new scalar shifts
5//! the window and emits a fresh `[f64; D]` to the forest. Isolation-
6//! depth scoring on the shingled view captures **temporal
7//! autocorrelation** that bare scalar scoring cannot — a dwell
8//! anomaly at constant rate (NAB `rogue_agent_key_hold`) does not
9//! expand the forest's bounding box on the raw scalar, but on the
10//! shingled vector the anomalous subsequence sits far from the
11//! baseline subsequences in the `D`-dim shingle space.
12//!
13//! Matches the shape of AWS Java's `RotateShingle` (random cut forest
14//! with internal ring buffer). This module is the RCF-side fix for
15//! the `rogue_agent_key_hold` = 0.145 / `SWaT` = 0.282 failures
16//! documented in `docs/performance.md`.
17//!
18//! # Build
19//!
20//! ```ignore
21//! use anomstream_core::ShingledForestBuilder;
22//!
23//! let mut forest = ShingledForestBuilder::<32>::new()
24//! .num_trees(100)
25//! .sample_size(256)
26//! .seed(2026)
27//! .build()?;
28//!
29//! for sample in stream_of_scalars {
30//! if forest.update_scalar(sample)? {
31//! let score = forest.score_scalar(sample)?;
32//! if f64::from(score) > 1.5 { eprintln!("contextual anomaly"); }
33//! }
34//! }
35//! ```
36//!
37//! # Shingled embedding shape
38//!
39//! For shingle size `D`, the emitted vector is
40//! `[v_{t-D+1}, …, v_{t-1}, v_t]` — oldest-first, newest-last. The
41//! ring buffer pre-loads on the first `D - 1` scalars; `update_scalar`
42//! returns `false` during warm-up and `true` once the forest received
43//! its first sample.
44//!
45//! # When to z-score
46//!
47//! Scaling is the caller's job. For NDR feature dims with wildly
48//! different magnitudes (packet-rate, entropy, port-count), z-score
49//! each scalar against its warm-phase `(mean, stddev)` **before**
50//! handing it to [`ShingledForest::update_scalar`] — RCF cuts are
51//! range-weighted, un-normalised scalars let whichever dim carries
52//! the biggest range dominate every cut.
53
54#![cfg(feature = "std")]
55
56use crate::domain::{AnomalyScore, DiVector};
57use crate::error::{RcfError, RcfResult};
58use crate::forest::RandomCutForest;
59use crate::{ForestBuilder, RcfConfig};
60
61/// Builder producing a [`ShingledForest`]. Delegates every RCF
62/// hyperparameter to [`ForestBuilder`] — the only extra is the
63/// compile-time shingle size which equals the forest
64/// dimensionality `D`.
65///
66/// The const-generic `D` **is** the shingle size: one shingled
67/// vector = last `D` scalars.
68#[derive(Debug)]
69pub struct ShingledForestBuilder<const D: usize> {
70 /// Underlying bare-forest builder — full passthrough of every
71 /// tuning knob.
72 inner: ForestBuilder<D>,
73}
74
75impl<const D: usize> Default for ShingledForestBuilder<D> {
76 fn default() -> Self {
77 Self::new()
78 }
79}
80
81impl<const D: usize> ShingledForestBuilder<D> {
82 /// Start a fresh builder with the bare-forest defaults.
83 #[must_use]
84 pub fn new() -> Self {
85 Self {
86 inner: ForestBuilder::<D>::new(),
87 }
88 }
89
90 /// Number of trees — forwarded to [`ForestBuilder::num_trees`].
91 #[must_use]
92 pub fn num_trees(mut self, trees: usize) -> Self {
93 self.inner = self.inner.num_trees(trees);
94 self
95 }
96
97 /// Sample size — forwarded to [`ForestBuilder::sample_size`].
98 #[must_use]
99 pub fn sample_size(mut self, sample: usize) -> Self {
100 self.inner = self.inner.sample_size(sample);
101 self
102 }
103
104 /// Master seed — forwarded to [`ForestBuilder::seed`].
105 #[must_use]
106 pub fn seed(mut self, seed: u64) -> Self {
107 self.inner = self.inner.seed(seed);
108 self
109 }
110
111 /// Time-decay — forwarded to [`ForestBuilder::time_decay`].
112 #[must_use]
113 pub fn time_decay(mut self, decay: f64) -> Self {
114 self.inner = self.inner.time_decay(decay);
115 self
116 }
117
118 /// Fetch the resolved [`RcfConfig`] that [`Self::build`] would
119 /// use — mirrors [`ForestBuilder::config`].
120 #[must_use]
121 pub fn config(&self) -> &RcfConfig {
122 self.inner.config()
123 }
124
125 /// Build the shingled forest. Fails exactly when the underlying
126 /// [`ForestBuilder::build`] fails.
127 ///
128 /// # Errors
129 ///
130 /// Propagates [`ForestBuilder::build`] errors.
131 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
132 pub fn build(self) -> RcfResult<ShingledForest<D>> {
133 let forest = self.inner.build()?;
134 Ok(ShingledForest {
135 forest,
136 ring: [0.0_f64; D],
137 filled: 0,
138 cursor: 0,
139 warmed: false,
140 })
141 }
142}
143
144/// `D`-dim shingled wrapper over [`RandomCutForest`] — scalar-stream
145/// input, internal ring buffer of the last `D` samples.
146///
147/// The ring buffer is stored oldest-to-newest logically but laid out
148/// as a **circular array** internally — constant-time update with no
149/// allocation. [`ShingledForest::current_shingle`] exposes the
150/// logical shingle in read-only form (oldest-first) for diagnostics.
151pub struct ShingledForest<const D: usize> {
152 /// Wrapped bare forest operating on shingled `[f64; D]` points.
153 forest: RandomCutForest<D>,
154 /// Circular storage for the last `D` scalars. `cursor` points
155 /// to the slot that will be overwritten on the next update.
156 ring: [f64; D],
157 /// Scalars received since construction / last `reset` — saturates
158 /// at `D`, used by [`Self::is_warmed`].
159 filled: usize,
160 /// Next write position in `ring`.
161 cursor: usize,
162 /// `true` once at least one full shingle has been submitted to
163 /// the forest.
164 warmed: bool,
165}
166
167impl<const D: usize> core::fmt::Debug for ShingledForest<D> {
168 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
169 f.debug_struct("ShingledForest")
170 .field("shingle_size", &D)
171 .field("filled", &self.filled)
172 .field("warmed", &self.warmed)
173 .finish_non_exhaustive()
174 }
175}
176
177impl<const D: usize> ShingledForest<D> {
178 /// Shingle size (equals the compile-time `D`).
179 #[must_use]
180 pub const fn shingle_size(&self) -> usize {
181 D
182 }
183
184 /// Whether the ring buffer holds a full `D`-scalar window and
185 /// the forest has received at least one shingle.
186 #[must_use]
187 pub const fn is_warmed(&self) -> bool {
188 self.warmed
189 }
190
191 /// Immutable view of the underlying bare forest — use this to
192 /// inspect tree state, read metrics, or route through the
193 /// [`RandomCutForest::forensic_baseline`] / `attribution`
194 /// helpers on the already-shingled last point.
195 #[must_use]
196 pub fn forest(&self) -> &RandomCutForest<D> {
197 &self.forest
198 }
199
200 /// Mutable escape hatch — handy for bootstrap replay
201 /// ([`RandomCutForest::bootstrap`]) when the caller has
202 /// pre-shingled their warm-up corpus.
203 pub fn forest_mut(&mut self) -> &mut RandomCutForest<D> {
204 &mut self.forest
205 }
206
207 /// Snapshot the current shingle in logical order (oldest-first).
208 /// Returns `None` while the ring is still partially empty.
209 #[must_use]
210 pub fn current_shingle(&self) -> Option<[f64; D]> {
211 if self.filled < D {
212 return None;
213 }
214 Some(self.materialise_shingle())
215 }
216
217 /// Fold `value` into the ring buffer; once the ring is full,
218 /// forward the shingled window to the forest. Returns `true`
219 /// when the shingle was submitted to the forest (i.e. the ring
220 /// was full before the call).
221 ///
222 /// # Errors
223 ///
224 /// - [`RcfError::NaNValue`] on non-finite `value`.
225 /// - Propagates [`RandomCutForest::update`] failures once the
226 /// shingle is submitted.
227 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
228 pub fn update_scalar(&mut self, value: f64) -> RcfResult<bool> {
229 if !value.is_finite() {
230 return Err(RcfError::NaNValue);
231 }
232 // Submit the *previous* shingle before rotating the ring —
233 // the new scalar becomes the newest entry of the shingle
234 // seen by the forest next call.
235 let submitted = if self.filled >= D {
236 let shingle = self.materialise_shingle();
237 self.forest.update(shingle)?;
238 self.warmed = true;
239 true
240 } else {
241 false
242 };
243 // Rotate the ring.
244 self.ring[self.cursor] = value;
245 self.cursor = (self.cursor + 1) % D;
246 if self.filled < D {
247 self.filled += 1;
248 }
249 Ok(submitted)
250 }
251
252 /// Score `value` against the frozen forest **without** folding
253 /// it into the ring buffer. The query uses the current shingle
254 /// with `value` appended as the newest slot — matches what a
255 /// subsequent [`Self::update_scalar`] would submit.
256 ///
257 /// # Errors
258 ///
259 /// - [`RcfError::NaNValue`] on non-finite `value`.
260 /// - [`RcfError::EmptyForest`] before the ring buffer is full
261 /// or the forest has not yet received its first update.
262 /// - Propagates [`RandomCutForest::score`] failures.
263 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
264 pub fn score_scalar(&self, value: f64) -> RcfResult<AnomalyScore> {
265 if !value.is_finite() {
266 return Err(RcfError::NaNValue);
267 }
268 let shingle = self.shingle_with(value)?;
269 self.forest.score(&shingle)
270 }
271
272 /// Attribution on the shingle formed by appending `value` to
273 /// the current ring. Returns a `D`-dim [`DiVector`] where each
274 /// dim is a **lag index** (0 = oldest, `D-1` = newest / `value`).
275 ///
276 /// # Errors
277 ///
278 /// Same as [`Self::score_scalar`].
279 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
280 pub fn attribution_scalar(&self, value: f64) -> RcfResult<DiVector> {
281 if !value.is_finite() {
282 return Err(RcfError::NaNValue);
283 }
284 let shingle = self.shingle_with(value)?;
285 self.forest.attribution(&shingle)
286 }
287
288 /// Stateless codisp on the shingle formed with `value` appended.
289 /// Non-mutating — preserves the frozen-baseline contract across
290 /// long streams. Prefer this over the mutating `score_codisp`
291 /// path for shingled forensic replay.
292 ///
293 /// # Errors
294 ///
295 /// Same as [`Self::score_scalar`].
296 pub fn score_codisp_stateless_scalar(&self, value: f64) -> RcfResult<AnomalyScore> {
297 if !value.is_finite() {
298 return Err(RcfError::NaNValue);
299 }
300 let shingle = self.shingle_with(value)?;
301 self.forest.score_codisp_stateless(&shingle)
302 }
303
304 /// Drop the ring buffer and reset the warm-up flag; the
305 /// underlying forest is **not** reset — callers who want a
306 /// full state wipe should rebuild.
307 pub fn reset_ring(&mut self) {
308 self.ring = [0.0_f64; D];
309 self.filled = 0;
310 self.cursor = 0;
311 self.warmed = false;
312 }
313
314 /// Logical oldest-first materialisation of the ring.
315 fn materialise_shingle(&self) -> [f64; D] {
316 let mut out = [0.0_f64; D];
317 // `cursor` points at the slot that will be overwritten
318 // next, which equals the position of the *oldest* entry
319 // when the ring is full.
320 for (i, slot) in out.iter_mut().enumerate() {
321 *slot = self.ring[(self.cursor + i) % D];
322 }
323 out
324 }
325
326 /// Build the shingle that would result from appending `value`
327 /// to the current ring, without mutating the ring.
328 fn shingle_with(&self, value: f64) -> RcfResult<[f64; D]> {
329 if self.filled < D {
330 return Err(RcfError::EmptyForest);
331 }
332 let mut out = [0.0_f64; D];
333 // Drop the oldest entry (at `cursor`), shift the rest left
334 // by one, append `value` as newest.
335 for (i, slot) in out.iter_mut().enumerate().take(D - 1) {
336 *slot = self.ring[(self.cursor + 1 + i) % D];
337 }
338 out[D - 1] = value;
339 Ok(out)
340 }
341}
342
343#[cfg(test)]
344#[allow(
345 clippy::unwrap_used,
346 clippy::panic,
347 clippy::float_cmp,
348 clippy::cast_lossless,
349 clippy::cast_precision_loss
350)]
351mod tests {
352 use super::*;
353
354 fn small() -> ShingledForest<4> {
355 ShingledForestBuilder::<4>::new()
356 .num_trees(50)
357 .sample_size(64)
358 .seed(2026)
359 .build()
360 .unwrap()
361 }
362
363 #[test]
364 fn warm_up_requires_d_scalars() {
365 let mut f = small();
366 for i in 0..3 {
367 let submitted = f.update_scalar(i as f64).unwrap();
368 assert!(!submitted, "shouldn't submit before ring is full");
369 assert!(!f.is_warmed());
370 }
371 // 4th scalar fills the ring but the next update is when the
372 // first *shingle* lands in the forest.
373 let submitted = f.update_scalar(3.0).unwrap();
374 assert!(!submitted);
375 assert_eq!(f.current_shingle(), Some([0.0, 1.0, 2.0, 3.0]));
376 // 5th scalar — now previous shingle [0,1,2,3] gets submitted.
377 let submitted = f.update_scalar(4.0).unwrap();
378 assert!(submitted);
379 assert!(f.is_warmed());
380 assert_eq!(f.current_shingle(), Some([1.0, 2.0, 3.0, 4.0]));
381 }
382
383 #[test]
384 fn update_scalar_rejects_nan() {
385 let mut f = small();
386 assert!(matches!(
387 f.update_scalar(f64::NAN).unwrap_err(),
388 RcfError::NaNValue
389 ));
390 assert!(matches!(
391 f.update_scalar(f64::INFINITY).unwrap_err(),
392 RcfError::NaNValue
393 ));
394 }
395
396 #[test]
397 fn score_before_warm_fails() {
398 let f = small();
399 assert!(matches!(
400 f.score_scalar(1.0).unwrap_err(),
401 RcfError::EmptyForest
402 ));
403 }
404
405 #[test]
406 fn score_after_warm_returns_non_negative() {
407 let mut f = small();
408 for i in 0..200 {
409 let _ = f.update_scalar(i as f64 * 0.01).unwrap();
410 }
411 let s: f64 = f.score_scalar(10.0).unwrap().into();
412 assert!(s.is_finite());
413 assert!(s >= 0.0);
414 }
415
416 #[test]
417 fn outlier_scalar_scores_higher_than_in_cluster() {
418 let mut f = ShingledForestBuilder::<8>::new()
419 .num_trees(100)
420 .sample_size(128)
421 .seed(7)
422 .build()
423 .unwrap();
424 // Warm on a tight cluster.
425 let mut tick = 0.0_f64;
426 for _ in 0..1_000 {
427 let _ = f.update_scalar((tick.sin() + 1.0) * 0.1).unwrap();
428 tick += 0.1;
429 }
430 // In-cluster probe.
431 let normal: f64 = f.score_scalar(0.10).unwrap().into();
432 // Outlier probe.
433 let outlier: f64 = f.score_scalar(100.0).unwrap().into();
434 assert!(
435 outlier > normal,
436 "outlier {outlier} should exceed in-cluster {normal}"
437 );
438 }
439
440 #[test]
441 fn shingle_with_does_not_mutate_ring() {
442 let mut f = small();
443 for i in 0..5 {
444 let _ = f.update_scalar(i as f64).unwrap();
445 }
446 let before = f.current_shingle().unwrap();
447 let _ = f.score_scalar(99.0).unwrap();
448 let after = f.current_shingle().unwrap();
449 assert_eq!(before, after);
450 }
451
452 #[test]
453 fn reset_ring_clears_warm_state_but_preserves_forest() {
454 let mut f = small();
455 for i in 0..10 {
456 let _ = f.update_scalar(i as f64).unwrap();
457 }
458 assert!(f.is_warmed());
459 f.reset_ring();
460 assert!(!f.is_warmed());
461 assert_eq!(f.current_shingle(), None);
462 // Forest still holds its leaves — a fresh shingle submission
463 // after re-warming should score against the prior baseline.
464 for i in 0..10 {
465 let _ = f.update_scalar(i as f64).unwrap();
466 }
467 let s: f64 = f.score_scalar(100.0).unwrap().into();
468 assert!(s.is_finite());
469 }
470
471 #[test]
472 fn codisp_stateless_on_shingle_matches_bare_forest() {
473 let mut f = small();
474 for i in 0..50 {
475 let _ = f.update_scalar(i as f64 * 0.01).unwrap();
476 }
477 let scalar_codisp: f64 = f.score_codisp_stateless_scalar(5.0).unwrap().into();
478 let shingle = f.shingle_with(5.0).unwrap();
479 let direct: f64 = f.forest().score_codisp_stateless(&shingle).unwrap().into();
480 assert!((scalar_codisp - direct).abs() < 1.0e-12);
481 }
482}