Skip to main content

anomstream_core/
bootstrap.rs

1//! Cold-start bootstrap: warm a detector from historical data before
2//! exposing it to live traffic.
3//!
4//! A freshly-built [`crate::RandomCutForest`] or
5//! [`crate::ThresholdedForest`] has an empty reservoir and (for the
6//! thresholded variant) an EMA of the anomaly-score stream with zero
7//! observations. Real scoring only becomes meaningful once the
8//! detector has seen enough points to populate the reservoir and
9//! converge the adaptive threshold. In a production streaming agent
10//! this warmup window is a coverage hole at every restart — unless
11//! the caller can replay a slice of recent history (from a TSDB,
12//! Kafka topic, S3 parquet dump…) before going live.
13//!
14//! [`RandomCutForest::bootstrap`], [`ThresholdedForest::bootstrap`]
15//! and [`crate::TenantForestPool::bootstrap`] accept any
16//! [`IntoIterator`] of `[f64; D]` points and ingest them through the
17//! normal `update` / `process` path, returning a [`BootstrapReport`]
18//! so the caller can confirm the detector is hot (observations past
19//! the configured warmup window, threshold above the floor).
20//!
21//! Points containing non-finite components (`NaN`, `±∞`) are
22//! **skipped** and tallied in the report rather than aborting the
23//! whole bootstrap — historical TSDB query results routinely contain
24//! gaps, and a single bad row should not sink the restart.
25
26use crate::error::{RcfError, RcfResult};
27use crate::forest::RandomCutForest;
28use crate::thresholded::ThresholdedForest;
29
30/// Summary of a bootstrap replay — what went in, what was filtered
31/// out, and where the detector's warmup stands afterwards.
32///
33/// Used by callers to check the detector is ready for live traffic
34/// before the streaming pipeline is switched on. A rule of thumb:
35/// `final_observations >= min_observations` (for a thresholded
36/// forest) means the threshold is adaptive, not the floor.
37#[derive(Debug, Clone, Copy, PartialEq)]
38#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
39pub struct BootstrapReport {
40    /// Number of points successfully folded into the detector.
41    pub points_ingested: u64,
42    /// Number of non-finite points skipped without mutating the
43    /// detector (NaN / ±∞ components).
44    pub points_skipped: u64,
45    /// Observation count of the detector's EMA stats after the
46    /// replay (`0` for [`RandomCutForest`] which has no threshold
47    /// layer).
48    pub final_observations: u64,
49    /// Adaptive threshold at the end of the replay (`0.0` for
50    /// [`RandomCutForest`] which has no threshold layer).
51    pub final_threshold: f64,
52}
53
54impl BootstrapReport {
55    /// Empty report — zero points, threshold at the configured floor.
56    #[must_use]
57    pub fn empty() -> Self {
58        Self {
59            points_ingested: 0,
60            points_skipped: 0,
61            final_observations: 0,
62            final_threshold: 0.0,
63        }
64    }
65
66    /// Whether any historical point actually made it into the
67    /// detector. A `false` return means the iterator was empty or
68    /// every row was non-finite — the detector is still cold and
69    /// should be treated as warming-up by downstream consumers.
70    #[must_use]
71    pub fn is_hot(&self) -> bool {
72        self.points_ingested > 0
73    }
74}
75
76impl Default for BootstrapReport {
77    fn default() -> Self {
78        Self::empty()
79    }
80}
81
82/// Skip policy applied to non-finite inputs during bootstrap.
83fn is_finite_point<const D: usize>(p: &[f64; D]) -> bool {
84    p.iter().all(|x| x.is_finite())
85}
86
87impl<const D: usize> RandomCutForest<D> {
88    /// Replay historical `points` through the forest without
89    /// exposing any score — warms the reservoir so subsequent
90    /// [`Self::score`] calls return meaningful values from the first
91    /// live point.
92    ///
93    /// Non-finite points are silently skipped and tallied in the
94    /// report. All other errors from [`Self::update`] are propagated.
95    ///
96    /// # Errors
97    ///
98    /// Propagates [`Self::update`] failures other than
99    /// [`RcfError::NaNValue`] (which is absorbed and counted as a
100    /// skip).
101    pub fn bootstrap<I>(&mut self, points: I) -> RcfResult<BootstrapReport>
102    where
103        I: IntoIterator<Item = [f64; D]>,
104    {
105        let mut ingested: u64 = 0;
106        let mut skipped: u64 = 0;
107        for p in points {
108            if !is_finite_point(&p) {
109                skipped = skipped.saturating_add(1);
110                continue;
111            }
112            match self.update(p) {
113                Ok(()) => ingested = ingested.saturating_add(1),
114                Err(RcfError::NaNValue) => skipped = skipped.saturating_add(1),
115                Err(other) => return Err(other),
116            }
117        }
118        #[cfg(feature = "std")]
119        {
120            use crate::metrics::names;
121            let sink = self.metrics_sink();
122            sink.inc_counter(names::BOOTSTRAP_POINTS_TOTAL, ingested);
123            sink.inc_counter(names::BOOTSTRAP_SKIPPED_TOTAL, skipped);
124        }
125        Ok(BootstrapReport {
126            points_ingested: ingested,
127            points_skipped: skipped,
128            final_observations: self.updates_seen(),
129            final_threshold: 0.0,
130        })
131    }
132}
133
134impl<const D: usize> ThresholdedForest<D> {
135    /// Replay historical `points` through the thresholded detector,
136    /// folding each one into the forest *and* the score-stream EMA
137    /// so the adaptive threshold is hot before the first live point.
138    ///
139    /// Graded verdicts produced during the replay are discarded —
140    /// they would be misleading for historical data. The detector
141    /// is ready for live traffic as soon as
142    /// [`BootstrapReport::final_observations`] passes the configured
143    /// `min_observations` threshold.
144    ///
145    /// Non-finite points are skipped and tallied in the report.
146    ///
147    /// # Errors
148    ///
149    /// Propagates [`Self::process`] failures other than
150    /// [`RcfError::NaNValue`] (absorbed and counted as a skip).
151    pub fn bootstrap<I>(&mut self, points: I) -> RcfResult<BootstrapReport>
152    where
153        I: IntoIterator<Item = [f64; D]>,
154    {
155        let mut ingested: u64 = 0;
156        let mut skipped: u64 = 0;
157        for p in points {
158            if !is_finite_point(&p) {
159                skipped = skipped.saturating_add(1);
160                continue;
161            }
162            match self.process(p) {
163                Ok(_) => ingested = ingested.saturating_add(1),
164                Err(RcfError::NaNValue) => skipped = skipped.saturating_add(1),
165                Err(other) => return Err(other),
166            }
167        }
168        #[cfg(feature = "std")]
169        {
170            use crate::metrics::names;
171            let sink = self.metrics_sink();
172            sink.inc_counter(names::BOOTSTRAP_POINTS_TOTAL, ingested);
173            sink.inc_counter(names::BOOTSTRAP_SKIPPED_TOTAL, skipped);
174        }
175        Ok(BootstrapReport {
176            points_ingested: ingested,
177            points_skipped: skipped,
178            final_observations: self.stats().observations(),
179            final_threshold: self.current_threshold(),
180        })
181    }
182}
183
184#[cfg(test)]
185#[allow(clippy::float_cmp)] // Tests assert bounds on closed-form quantities.
186mod tests {
187    use super::*;
188    use crate::{ForestBuilder, ThresholdedForestBuilder};
189
190    #[test]
191    fn bootstrap_report_empty_defaults() {
192        let r = BootstrapReport::empty();
193        assert_eq!(r.points_ingested, 0);
194        assert_eq!(r.points_skipped, 0);
195        assert_eq!(r.final_observations, 0);
196        assert_eq!(r.final_threshold, 0.0);
197        assert!(!r.is_hot());
198        assert_eq!(r, BootstrapReport::default());
199    }
200
201    #[test]
202    fn forest_bootstrap_from_empty_iter_is_noop() {
203        let mut f = ForestBuilder::<2>::new().seed(1).build().unwrap();
204        let r = f.bootstrap(std::iter::empty::<[f64; 2]>()).unwrap();
205        assert_eq!(r.points_ingested, 0);
206        assert!(!r.is_hot());
207        assert_eq!(f.updates_seen(), 0);
208    }
209
210    #[test]
211    fn forest_bootstrap_counts_ingested_and_skipped() {
212        let mut f = ForestBuilder::<2>::new().seed(1).build().unwrap();
213        let pts: Vec<[f64; 2]> = vec![
214            [0.0, 0.0],
215            [1.0, 1.0],
216            [f64::NAN, 0.0], // skip
217            [2.0, 2.0],
218            [0.0, f64::INFINITY], // skip
219        ];
220        let r = f.bootstrap(pts).unwrap();
221        assert_eq!(r.points_ingested, 3);
222        assert_eq!(r.points_skipped, 2);
223        assert_eq!(f.updates_seen(), 3);
224        assert_eq!(r.final_observations, 3);
225    }
226
227    #[test]
228    fn thresholded_bootstrap_makes_detector_ready() {
229        use rand::{RngExt, SeedableRng};
230        use rand_chacha::ChaCha8Rng;
231
232        let mut d = ThresholdedForestBuilder::<4>::new()
233            .num_trees(50)
234            .sample_size(64)
235            .min_observations(32)
236            .min_threshold(0.1)
237            .seed(42)
238            .build()
239            .unwrap();
240        let mut rng = ChaCha8Rng::seed_from_u64(42);
241        let history: Vec<[f64; 4]> = (0..512)
242            .map(|_| {
243                [
244                    rng.random::<f64>() * 0.1,
245                    rng.random::<f64>() * 0.1,
246                    rng.random::<f64>() * 0.1,
247                    rng.random::<f64>() * 0.1,
248                ]
249            })
250            .collect();
251
252        let r = d.bootstrap(history).unwrap();
253        assert_eq!(r.points_ingested, 512);
254        assert!(r.is_hot());
255        assert!(r.final_observations >= 32, "should be past warmup");
256        assert!(r.final_threshold > 0.1, "threshold should be adaptive");
257
258        // First live probe produces a ready verdict, no warming-up.
259        let verdict = d.score_only(&[0.05, 0.05, 0.05, 0.05]).unwrap();
260        assert!(verdict.ready(), "detector must be hot after bootstrap");
261    }
262
263    #[test]
264    fn thresholded_bootstrap_detects_outlier_immediately() {
265        use rand::{RngExt, SeedableRng};
266        use rand_chacha::ChaCha8Rng;
267
268        let mut d = ThresholdedForestBuilder::<4>::new()
269            .num_trees(50)
270            .sample_size(64)
271            .min_observations(32)
272            .min_threshold(0.1)
273            .seed(3)
274            .build()
275            .unwrap();
276        let mut rng = ChaCha8Rng::seed_from_u64(3);
277        let history: Vec<[f64; 4]> = (0..512)
278            .map(|_| {
279                [
280                    rng.random::<f64>() * 0.1,
281                    rng.random::<f64>() * 0.1,
282                    rng.random::<f64>() * 0.1,
283                    rng.random::<f64>() * 0.1,
284                ]
285            })
286            .collect();
287        d.bootstrap(history).unwrap();
288        let outlier = d.process([50.0, 50.0, 50.0, 50.0]).unwrap();
289        assert!(outlier.ready());
290        assert!(outlier.is_anomaly());
291        assert!(outlier.grade() > 0.0);
292    }
293
294    #[test]
295    fn thresholded_bootstrap_skips_non_finite() {
296        let mut d = ThresholdedForestBuilder::<2>::new()
297            .num_trees(50)
298            .sample_size(16)
299            .min_observations(4)
300            .seed(1)
301            .build()
302            .unwrap();
303        let pts: Vec<[f64; 2]> = vec![
304            [0.0, 0.0],
305            [f64::NAN, 0.0],
306            [0.5, 0.5],
307            [f64::NEG_INFINITY, 1.0],
308        ];
309        let r = d.bootstrap(pts).unwrap();
310        assert_eq!(r.points_ingested, 2);
311        assert_eq!(r.points_skipped, 2);
312    }
313}