anomstream_core/drift_aware.rs
1//! Shadow-forest drift recovery — wraps a live
2//! [`crate::RandomCutForest`] plus an optional shadow that warms
3//! on the post-drift stream, then atomically replaces the primary
4//! once the shadow has seen enough observations.
5//!
6//! Pairs with any upstream drift trigger — [`crate::AdwinDetector`]
7//! on the score stream, [`crate::FeatureDriftDetector`] PSI alert
8//! level, or [`crate::MetaDriftDetector`] CUSUM fire. The trigger
9//! logic lives outside this type; callers call
10//! [`DriftAwareForest::on_drift`] when they want a shadow to spawn.
11//!
12//! ```ignore
13//! use anomstream_core::{AdwinDetector, DriftAwareForest, DriftRecoveryConfig, ForestBuilder};
14//!
15//! let builder = ForestBuilder::<16>::new()
16//! .num_trees(100)
17//! .sample_size(256)
18//! .seed(42);
19//! let mut detector = DriftAwareForest::new(
20//! builder,
21//! DriftRecoveryConfig::default(),
22//! )?;
23//! let mut adwin = AdwinDetector::default_bounded();
24//!
25//! for point in stream_of_points {
26//! detector.update(point)?;
27//! let score = detector.score(&point)?;
28//! if adwin.update(f64::from(score)) {
29//! detector.on_drift()?; // spawn shadow
30//! }
31//! }
32//! # Ok::<(), anomstream_core::RcfError>(())
33//! ```
34
35#![cfg(feature = "std")]
36
37use std::sync::Arc;
38
39use crate::config::ForestBuilder;
40use crate::domain::{AnomalyScore, DiVector};
41use crate::error::RcfResult;
42use crate::forest::RandomCutForest;
43use crate::metrics::{MetricsSink, default_sink, names};
44
45/// Policy parameters for the shadow swap.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
48pub struct DriftRecoveryConfig {
49 /// Shadow must ingest at least this many observations before it
50 /// replaces the primary. Sized to fill the reservoir twice over
51 /// so the new baseline is stable when the swap lands.
52 pub shadow_warmup: u64,
53 /// Minimum observations a newly-swapped primary has to see
54 /// before [`DriftAwareForest::on_drift`] can spawn another
55 /// shadow. Prevents flap-loops from a noisy trigger.
56 pub min_primary_age: u64,
57}
58
59impl Default for DriftRecoveryConfig {
60 fn default() -> Self {
61 Self {
62 shadow_warmup: 1_024,
63 min_primary_age: 512,
64 }
65 }
66}
67
68/// Stateful shadow forest — accumulates post-drift observations
69/// alongside the primary, then replaces it on warmup completion.
70#[derive(Debug)]
71struct ShadowState<const D: usize> {
72 /// Parallel forest being warmed on recent traffic.
73 forest: RandomCutForest<D>,
74 /// Observations ingested into the shadow since it was spawned.
75 seen: u64,
76}
77
78/// Forest wrapper that handles drift recovery via a shadow swap.
79///
80/// The primary forest handles every `score` / `score_many` call —
81/// this type is a drop-in facade for the hot-path. Drift recovery
82/// is entirely opt-in through [`Self::on_drift`]; without a
83/// trigger call the wrapper behaves exactly like a bare
84/// [`RandomCutForest`].
85#[derive(Debug)]
86pub struct DriftAwareForest<const D: usize> {
87 /// Live forest — every `score` reads from here.
88 primary: RandomCutForest<D>,
89 /// Optional shadow — `Some` between `on_drift` and the swap.
90 shadow: Option<ShadowState<D>>,
91 /// Observations the current primary has ingested.
92 primary_age: u64,
93 /// Builder template used to spawn fresh shadows.
94 builder: ForestBuilder<D>,
95 /// Recovery policy.
96 config: DriftRecoveryConfig,
97 /// Lifetime count of completed shadow swaps — observability.
98 swaps: u64,
99 /// Observability sink.
100 metrics: Arc<dyn MetricsSink>,
101}
102
103impl<const D: usize> DriftAwareForest<D> {
104 /// Build a drift-aware wrapper from a prepared [`ForestBuilder`].
105 /// The builder is cloned internally to spawn shadow forests on
106 /// demand.
107 ///
108 /// # Errors
109 ///
110 /// Propagates [`ForestBuilder::build`] failures.
111 pub fn new(builder: ForestBuilder<D>, config: DriftRecoveryConfig) -> RcfResult<Self> {
112 let primary = builder.clone().build()?;
113 Ok(Self {
114 primary,
115 shadow: None,
116 primary_age: 0,
117 builder,
118 config,
119 swaps: 0,
120 metrics: default_sink(),
121 })
122 }
123
124 /// Install a metrics sink — `on_drift` / swap emit counters,
125 /// shadow activity emits a gauge.
126 #[must_use]
127 pub fn with_metrics_sink(mut self, sink: Arc<dyn MetricsSink>) -> Self {
128 self.metrics = sink;
129 self
130 }
131
132 /// Read-only handle to the installed sink.
133 #[must_use]
134 pub fn metrics_sink(&self) -> &Arc<dyn MetricsSink> {
135 &self.metrics
136 }
137
138 /// Read-only access to the live primary forest.
139 #[must_use]
140 pub fn forest(&self) -> &RandomCutForest<D> {
141 &self.primary
142 }
143
144 /// Whether a shadow is currently warming.
145 #[must_use]
146 pub fn is_recovering(&self) -> bool {
147 self.shadow.is_some()
148 }
149
150 /// Number of observations the shadow has seen since spawn
151 /// (`0` when no shadow is active).
152 #[must_use]
153 pub fn shadow_progress(&self) -> u64 {
154 self.shadow.as_ref().map_or(0, |s| s.seen)
155 }
156
157 /// Observations the current primary has ingested since the
158 /// last shadow swap (or construction).
159 #[must_use]
160 pub fn primary_age(&self) -> u64 {
161 self.primary_age
162 }
163
164 /// Lifetime shadow swaps completed.
165 #[must_use]
166 pub fn swaps_total(&self) -> u64 {
167 self.swaps
168 }
169
170 /// Policy knobs.
171 #[must_use]
172 pub fn config(&self) -> DriftRecoveryConfig {
173 self.config
174 }
175
176 /// Fold `point` into the primary and, when present, into the
177 /// shadow. Triggers an atomic swap if the shadow has reached
178 /// `config.shadow_warmup`.
179 ///
180 /// # Errors
181 ///
182 /// Propagates [`RandomCutForest::update`] failures from either
183 /// path; on shadow error the shadow is discarded so the
184 /// primary stays healthy.
185 pub fn update(&mut self, point: [f64; D]) -> RcfResult<()> {
186 self.primary.update(point)?;
187 self.primary_age = self.primary_age.saturating_add(1);
188
189 if let Some(shadow) = self.shadow.as_mut() {
190 match shadow.forest.update(point) {
191 Ok(()) => {
192 shadow.seen = shadow.seen.saturating_add(1);
193 }
194 Err(e) => {
195 // Drop the shadow — primary path must stay
196 // clean. The caller can re-arm via on_drift.
197 self.shadow = None;
198 self.metrics
199 .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 0.0);
200 return Err(e);
201 }
202 }
203 if self
204 .shadow
205 .as_ref()
206 .is_some_and(|s| s.seen >= self.config.shadow_warmup)
207 {
208 self.swap_shadow_into_primary();
209 }
210 }
211 Ok(())
212 }
213
214 /// Score `point` against the primary. Shadow is not consulted
215 /// — scoring stays on the stable baseline until the swap lands.
216 ///
217 /// # Errors
218 ///
219 /// Propagates [`RandomCutForest::score`] failures.
220 pub fn score(&self, point: &[f64; D]) -> RcfResult<AnomalyScore> {
221 self.primary.score(point)
222 }
223
224 /// Attribution against the primary.
225 ///
226 /// # Errors
227 ///
228 /// Propagates [`RandomCutForest::attribution`] failures.
229 pub fn attribution(&self, point: &[f64; D]) -> RcfResult<DiVector> {
230 self.primary.attribution(point)
231 }
232
233 /// Spawn a shadow forest to train on the post-drift stream.
234 /// No-op when a shadow is already warming, or when the primary
235 /// has not yet reached `config.min_primary_age` (anti-flap
236 /// guard).
237 ///
238 /// # Errors
239 ///
240 /// Propagates [`ForestBuilder::build`] failures.
241 pub fn on_drift(&mut self) -> RcfResult<bool> {
242 if self.shadow.is_some() {
243 return Ok(false);
244 }
245 if self.primary_age < self.config.min_primary_age {
246 return Ok(false);
247 }
248 let fresh = self.builder.clone().build()?;
249 self.shadow = Some(ShadowState {
250 forest: fresh,
251 seen: 0,
252 });
253 self.metrics
254 .inc_counter(names::DRIFT_AWARE_ON_DRIFT_TOTAL, 1);
255 self.metrics
256 .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 1.0);
257 Ok(true)
258 }
259
260 /// Cancel the current shadow (if any) without swapping. Used
261 /// when the trigger retracts its alert or the operator wants
262 /// to abort recovery.
263 pub fn abort_shadow(&mut self) {
264 self.shadow = None;
265 self.metrics
266 .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 0.0);
267 }
268
269 /// Promote shadow → primary. Callers never invoke this
270 /// directly — [`Self::update`] handles the swap once the
271 /// shadow reaches `shadow_warmup`.
272 fn swap_shadow_into_primary(&mut self) {
273 if let Some(shadow) = self.shadow.take() {
274 self.primary = shadow.forest;
275 self.primary_age = shadow.seen;
276 self.swaps = self.swaps.saturating_add(1);
277 self.metrics.inc_counter(names::DRIFT_AWARE_SWAPS_TOTAL, 1);
278 self.metrics
279 .set_gauge(names::DRIFT_AWARE_SHADOW_ACTIVE, 0.0);
280 }
281 }
282}
283
284#[cfg(test)]
285#[allow(
286 clippy::unwrap_used,
287 clippy::panic,
288 clippy::float_cmp,
289 clippy::cast_precision_loss
290)]
291mod tests {
292 use super::*;
293
294 fn small_builder() -> ForestBuilder<2> {
295 ForestBuilder::<2>::new()
296 .num_trees(50)
297 .sample_size(64)
298 .seed(2026)
299 }
300
301 #[test]
302 fn fresh_wrapper_has_no_shadow() {
303 let d = DriftAwareForest::new(small_builder(), DriftRecoveryConfig::default()).unwrap();
304 assert!(!d.is_recovering());
305 assert_eq!(d.shadow_progress(), 0);
306 assert_eq!(d.swaps_total(), 0);
307 }
308
309 #[test]
310 fn on_drift_requires_min_primary_age() {
311 let mut d = DriftAwareForest::new(
312 small_builder(),
313 DriftRecoveryConfig {
314 shadow_warmup: 10,
315 min_primary_age: 50,
316 },
317 )
318 .unwrap();
319 // Only a handful of updates — below min_primary_age → no-op.
320 for _ in 0..10 {
321 d.update([0.1, 0.2]).unwrap();
322 }
323 assert!(!d.on_drift().unwrap());
324 assert!(!d.is_recovering());
325 }
326
327 #[test]
328 fn on_drift_spawns_shadow_when_primary_mature() {
329 let mut d = DriftAwareForest::new(
330 small_builder(),
331 DriftRecoveryConfig {
332 shadow_warmup: 100,
333 min_primary_age: 50,
334 },
335 )
336 .unwrap();
337 for _ in 0..60 {
338 d.update([0.1, 0.2]).unwrap();
339 }
340 assert!(d.on_drift().unwrap());
341 assert!(d.is_recovering());
342 assert_eq!(d.shadow_progress(), 0);
343 // A second on_drift during recovery is a no-op.
344 assert!(!d.on_drift().unwrap());
345 }
346
347 #[test]
348 fn shadow_promotes_after_warmup() {
349 let mut d = DriftAwareForest::new(
350 small_builder(),
351 DriftRecoveryConfig {
352 shadow_warmup: 30,
353 min_primary_age: 10,
354 },
355 )
356 .unwrap();
357 for _ in 0..20 {
358 d.update([0.1, 0.2]).unwrap();
359 }
360 d.on_drift().unwrap();
361 for i in 0..30 {
362 let v = f64::from(i) * 0.01;
363 d.update([v, v + 0.5]).unwrap();
364 }
365 assert!(!d.is_recovering());
366 assert_eq!(d.swaps_total(), 1);
367 // Primary age should reset to the shadow's warm-up count.
368 assert_eq!(d.primary_age(), 30);
369 }
370
371 #[test]
372 fn abort_shadow_discards_recovery() {
373 let mut d = DriftAwareForest::new(
374 small_builder(),
375 DriftRecoveryConfig {
376 shadow_warmup: 100,
377 min_primary_age: 10,
378 },
379 )
380 .unwrap();
381 for _ in 0..20 {
382 d.update([0.1, 0.2]).unwrap();
383 }
384 d.on_drift().unwrap();
385 assert!(d.is_recovering());
386 d.abort_shadow();
387 assert!(!d.is_recovering());
388 assert_eq!(d.swaps_total(), 0);
389 }
390
391 #[test]
392 fn score_uses_primary_forest_always() {
393 let mut d = DriftAwareForest::new(small_builder(), DriftRecoveryConfig::default()).unwrap();
394 for i in 0..100 {
395 let v = f64::from(i) * 0.01;
396 d.update([v, v + 0.5]).unwrap();
397 }
398 // Even during recovery the public `score` reads from the
399 // primary (stable baseline).
400 let s_before: f64 = d.score(&[0.5, 1.0]).unwrap().into();
401 d.on_drift().unwrap();
402 let s_during: f64 = d.score(&[0.5, 1.0]).unwrap().into();
403 assert_eq!(s_before, s_during);
404 }
405}