Skip to main content

anomstream_core/
drift_aware.rs

1//! Shadow-forest drift recovery — wraps a live
2//! [`crate::RandomCutForest`] plus an optional shadow that warms
3//! on the post-drift stream, then atomically replaces the primary
4//! once the shadow has seen enough observations.
5//!
6//! Pairs with any upstream drift trigger — [`crate::AdwinDetector`]
7//! on the score stream, [`crate::FeatureDriftDetector`] PSI alert
8//! level, or [`crate::MetaDriftDetector`] CUSUM fire. The trigger
9//! logic lives outside this type; callers call
10//! [`DriftAwareForest::on_drift`] when they want a shadow to spawn.
11//!
12//! ```ignore
13//! use anomstream_core::{AdwinDetector, DriftAwareForest, DriftRecoveryConfig, ForestBuilder};
14//!
15//! let builder = ForestBuilder::<16>::new()
16//!     .num_trees(100)
17//!     .sample_size(256)
18//!     .seed(42);
19//! let mut detector = DriftAwareForest::new(
20//!     builder,
21//!     DriftRecoveryConfig::default(),
22//! )?;
23//! let mut adwin = AdwinDetector::default_bounded();
24//!
25//! for point in stream_of_points {
26//!     detector.update(point)?;
27//!     let score = detector.score(&point)?;
28//!     if adwin.update(f64::from(score)) {
29//!         detector.on_drift()?;           // spawn shadow
30//!     }
31//! }
32//! # Ok::<(), anomstream_core::RcfError>(())
33//! ```
34
35#![cfg(feature = "std")]
36
37use std::sync::Arc;
38
39use crate::config::ForestBuilder;
40use crate::domain::{AnomalyScore, DiVector};
41use crate::error::RcfResult;
42use crate::forest::RandomCutForest;
43use crate::metrics::{MetricsSink, default_sink, names};
44
45/// Policy parameters for the shadow swap.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
48pub struct DriftRecoveryConfig {
49    /// Shadow must ingest at least this many observations before it
50    /// replaces the primary. Sized to fill the reservoir twice over
51    /// so the new baseline is stable when the swap lands.
52    pub shadow_warmup: u64,
53    /// Minimum observations a newly-swapped primary has to see
54    /// before [`DriftAwareForest::on_drift`] can spawn another
55    /// shadow. Prevents flap-loops from a noisy trigger.
56    pub min_primary_age: u64,
57}
58
59impl Default for DriftRecoveryConfig {
60    fn default() -> Self {
61        Self {
62            shadow_warmup: 1_024,
63            min_primary_age: 512,
64        }
65    }
66}
67
68/// Stateful shadow forest — accumulates post-drift observations
69/// alongside the primary, then replaces it on warmup completion.
70#[derive(Debug)]
71struct ShadowState<const D: usize> {
72    /// Parallel forest being warmed on recent traffic.
73    forest: RandomCutForest<D>,
74    /// Observations ingested into the shadow since it was spawned.
75    seen: u64,
76}
77
78/// Forest wrapper that handles drift recovery via a shadow swap.
79///
80/// The primary forest handles every `score` / `score_many` call —
81/// this type is a drop-in facade for the hot-path. Drift recovery
82/// is entirely opt-in through [`Self::on_drift`]; without a
83/// trigger call the wrapper behaves exactly like a bare
84/// [`RandomCutForest`].
85#[derive(Debug)]
86pub struct DriftAwareForest<const D: usize> {
87    /// Live forest — every `score` reads from here.
88    primary: RandomCutForest<D>,
89    /// Optional shadow — `Some` between `on_drift` and the swap.
90    shadow: Option<ShadowState<D>>,
91    /// Observations the current primary has ingested.
92    primary_age: u64,
93    /// Builder template used to spawn fresh shadows.
94    builder: ForestBuilder<D>,
95    /// Recovery policy.
96    config: DriftRecoveryConfig,
97    /// Lifetime count of completed shadow swaps — observability.
98    swaps: u64,
99    /// Observability sink.
100    metrics: Arc<dyn MetricsSink>,
101}
102
103impl<const D: usize> DriftAwareForest<D> {
104    /// Build a drift-aware wrapper from a prepared [`ForestBuilder`].
105    /// The builder is cloned internally to spawn shadow forests on
106    /// demand.
107    ///
108    /// # Errors
109    ///
110    /// Propagates [`ForestBuilder::build`] failures.
111    pub fn new(builder: ForestBuilder<D>, config: DriftRecoveryConfig) -> RcfResult<Self> {
112        let primary = builder.clone().build()?;
113        Ok(Self {
114            primary,
115            shadow: None,
116            primary_age: 0,
117            builder,
118            config,
119            swaps: 0,
120            metrics: default_sink(),
121        })
122    }
123
124    /// Install a metrics sink — `on_drift` / swap emit counters,
125    /// shadow activity emits a gauge.
126    #[must_use]
127    pub fn with_metrics_sink(mut self, sink: Arc<dyn MetricsSink>) -> Self {
128        self.metrics = sink;
129        self
130    }
131
132    /// Read-only handle to the installed sink.
133    #[must_use]
134    pub fn metrics_sink(&self) -> &Arc<dyn MetricsSink> {
135        &self.metrics
136    }
137
138    /// Read-only access to the live primary forest.
139    #[must_use]
140    pub fn forest(&self) -> &RandomCutForest<D> {
141        &self.primary
142    }
143
144    /// Whether a shadow is currently warming.
145    #[must_use]
146    pub fn is_recovering(&self) -> bool {
147        self.shadow.is_some()
148    }
149
150    /// Number of observations the shadow has seen since spawn
151    /// (`0` when no shadow is active).
152    #[must_use]
153    pub fn shadow_progress(&self) -> u64 {
154        self.shadow.as_ref().map_or(0, |s| s.seen)
155    }
156
157    /// Observations the current primary has ingested since the
158    /// last shadow swap (or construction).
159    #[must_use]
160    pub fn primary_age(&self) -> u64 {
161        self.primary_age
162    }
163
164    /// Lifetime shadow swaps completed.
165    #[must_use]
166    pub fn swaps_total(&self) -> u64 {
167        self.swaps
168    }
169
170    /// Policy knobs.
171    #[must_use]
172    pub fn config(&self) -> DriftRecoveryConfig {
173        self.config
174    }
175
176    /// Fold `point` into the primary and, when present, into the
177    /// shadow. Triggers an atomic swap if the shadow has reached
178    /// `config.shadow_warmup`.
179    ///
180    /// # Errors
181    ///
182    /// Propagates [`RandomCutForest::update`] failures from either
183    /// path; on shadow error the shadow is discarded so the
184    /// primary stays healthy.
185    pub fn update(&mut self, point: [f64; D]) -> RcfResult<()> {
186        self.primary.update(point)?;
187        self.primary_age = self.primary_age.saturating_add(1);
188
189        if let Some(shadow) = self.shadow.as_mut() {
190            match shadow.forest.update(point) {
191                Ok(()) => {
192                    shadow.seen = shadow.seen.saturating_add(1);
193                }
194                Err(e) => {
195                    // Drop the shadow — primary path must stay
196                    // clean. The caller can re-arm via on_drift.
197                    self.shadow = None;
198                    self.metrics
199                        .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 0.0);
200                    return Err(e);
201                }
202            }
203            if self
204                .shadow
205                .as_ref()
206                .is_some_and(|s| s.seen >= self.config.shadow_warmup)
207            {
208                self.swap_shadow_into_primary();
209            }
210        }
211        Ok(())
212    }
213
214    /// Score `point` against the primary. Shadow is not consulted
215    /// — scoring stays on the stable baseline until the swap lands.
216    ///
217    /// # Errors
218    ///
219    /// Propagates [`RandomCutForest::score`] failures.
220    pub fn score(&self, point: &[f64; D]) -> RcfResult<AnomalyScore> {
221        self.primary.score(point)
222    }
223
224    /// Attribution against the primary.
225    ///
226    /// # Errors
227    ///
228    /// Propagates [`RandomCutForest::attribution`] failures.
229    pub fn attribution(&self, point: &[f64; D]) -> RcfResult<DiVector> {
230        self.primary.attribution(point)
231    }
232
233    /// Spawn a shadow forest to train on the post-drift stream.
234    /// No-op when a shadow is already warming, or when the primary
235    /// has not yet reached `config.min_primary_age` (anti-flap
236    /// guard).
237    ///
238    /// # Errors
239    ///
240    /// Propagates [`ForestBuilder::build`] failures.
241    pub fn on_drift(&mut self) -> RcfResult<bool> {
242        if self.shadow.is_some() {
243            return Ok(false);
244        }
245        if self.primary_age < self.config.min_primary_age {
246            return Ok(false);
247        }
248        let fresh = self.builder.clone().build()?;
249        self.shadow = Some(ShadowState {
250            forest: fresh,
251            seen: 0,
252        });
253        self.metrics
254            .inc_counter(names::DRIFT_AWARE_ON_DRIFT_TOTAL, 1);
255        self.metrics
256            .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 1.0);
257        Ok(true)
258    }
259
260    /// Cancel the current shadow (if any) without swapping. Used
261    /// when the trigger retracts its alert or the operator wants
262    /// to abort recovery.
263    pub fn abort_shadow(&mut self) {
264        self.shadow = None;
265        self.metrics
266            .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 0.0);
267    }
268
269    /// Promote shadow → primary. Callers never invoke this
270    /// directly — [`Self::update`] handles the swap once the
271    /// shadow reaches `shadow_warmup`.
272    fn swap_shadow_into_primary(&mut self) {
273        if let Some(shadow) = self.shadow.take() {
274            self.primary = shadow.forest;
275            self.primary_age = shadow.seen;
276            self.swaps = self.swaps.saturating_add(1);
277            self.metrics.inc_counter(names::DRIFT_AWARE_SWAPS_TOTAL, 1);
278            self.metrics
279                .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 0.0);
280        }
281    }
282}
283
284#[cfg(test)]
285#[allow(
286    clippy::unwrap_used,
287    clippy::panic,
288    clippy::float_cmp,
289    clippy::cast_precision_loss
290)]
291mod tests {
292    use super::*;
293
294    fn small_builder() -> ForestBuilder<2> {
295        ForestBuilder::<2>::new()
296            .num_trees(50)
297            .sample_size(64)
298            .seed(2026)
299    }
300
301    #[test]
302    fn fresh_wrapper_has_no_shadow() {
303        let d = DriftAwareForest::new(small_builder(), DriftRecoveryConfig::default()).unwrap();
304        assert!(!d.is_recovering());
305        assert_eq!(d.shadow_progress(), 0);
306        assert_eq!(d.swaps_total(), 0);
307    }
308
309    #[test]
310    fn on_drift_requires_min_primary_age() {
311        let mut d = DriftAwareForest::new(
312            small_builder(),
313            DriftRecoveryConfig {
314                shadow_warmup: 10,
315                min_primary_age: 50,
316            },
317        )
318        .unwrap();
319        // Only a handful of updates — below min_primary_age → no-op.
320        for _ in 0..10 {
321            d.update([0.1, 0.2]).unwrap();
322        }
323        assert!(!d.on_drift().unwrap());
324        assert!(!d.is_recovering());
325    }
326
327    #[test]
328    fn on_drift_spawns_shadow_when_primary_mature() {
329        let mut d = DriftAwareForest::new(
330            small_builder(),
331            DriftRecoveryConfig {
332                shadow_warmup: 100,
333                min_primary_age: 50,
334            },
335        )
336        .unwrap();
337        for _ in 0..60 {
338            d.update([0.1, 0.2]).unwrap();
339        }
340        assert!(d.on_drift().unwrap());
341        assert!(d.is_recovering());
342        assert_eq!(d.shadow_progress(), 0);
343        // A second on_drift during recovery is a no-op.
344        assert!(!d.on_drift().unwrap());
345    }
346
347    #[test]
348    fn shadow_promotes_after_warmup() {
349        let mut d = DriftAwareForest::new(
350            small_builder(),
351            DriftRecoveryConfig {
352                shadow_warmup: 30,
353                min_primary_age: 10,
354            },
355        )
356        .unwrap();
357        for _ in 0..20 {
358            d.update([0.1, 0.2]).unwrap();
359        }
360        d.on_drift().unwrap();
361        for i in 0..30 {
362            let v = f64::from(i) * 0.01;
363            d.update([v, v + 0.5]).unwrap();
364        }
365        assert!(!d.is_recovering());
366        assert_eq!(d.swaps_total(), 1);
367        // Primary age should reset to the shadow's warm-up count.
368        assert_eq!(d.primary_age(), 30);
369    }
370
371    #[test]
372    fn abort_shadow_discards_recovery() {
373        let mut d = DriftAwareForest::new(
374            small_builder(),
375            DriftRecoveryConfig {
376                shadow_warmup: 100,
377                min_primary_age: 10,
378            },
379        )
380        .unwrap();
381        for _ in 0..20 {
382            d.update([0.1, 0.2]).unwrap();
383        }
384        d.on_drift().unwrap();
385        assert!(d.is_recovering());
386        d.abort_shadow();
387        assert!(!d.is_recovering());
388        assert_eq!(d.swaps_total(), 0);
389    }
390
391    #[test]
392    fn score_uses_primary_forest_always() {
393        let mut d = DriftAwareForest::new(small_builder(), DriftRecoveryConfig::default()).unwrap();
394        for i in 0..100 {
395            let v = f64::from(i) * 0.01;
396            d.update([v, v + 0.5]).unwrap();
397        }
398        // Even during recovery the public `score` reads from the
399        // primary (stable baseline).
400        let s_before: f64 = d.score(&[0.5, 1.0]).unwrap().into();
401        d.on_drift().unwrap();
402        let s_during: f64 = d.score(&[0.5, 1.0]).unwrap().into();
403        assert_eq!(s_before, s_during);
404    }
405}