Skip to main content

anomstream_core/thresholded/
detector.rs

1//! Adaptive-threshold wrapper around [`RandomCutForest`].
2//!
3//! [`ThresholdedForest`] composes a [`RandomCutForest`] with an EMA of
4//! the anomaly-score stream ([`EmaStats`]) and derives a continuously
5//! updated threshold:
6//!
7//! ```text
8//! threshold  = max(min_threshold, mean + z_factor · stddev)
9//! is_anomaly = ready && score > threshold
10//! grade      = clamp01( (score − threshold) / (z_factor · stddev) )  if ready
11//!            | 0.0                                                   otherwise
12//! ```
13//!
14//! where `ready` means the detector has seen at least
15//! `min_observations` points *and* the running stddev is strictly
16//! positive. The `ready` flag guards the cold-start period so callers
17//! never see spurious anomaly verdicts on the first few points (the
18//! EMA has not yet converged and the bootstrap variance is exactly
19//! zero).
20//!
21//! # Scoring protocol
22//!
23//! `process(point)` evaluates the point *before* inserting it into
24//! the forest. This avoids a self-referential bias where the freshly
25//! inserted point would be scored against a forest that already
26//! contains it — always shallow, always low-anomaly. The stats EMA
27//! is updated with the pre-insert score so the threshold adapts to
28//! the distribution of scores the forest would assign to unseen
29//! points.
30
31use alloc::vec::Vec;
32
33use crate::config::RcfConfig;
34use crate::domain::point::ensure_finite;
35use crate::domain::{AnomalyScore, DiVector};
36use crate::error::{RcfError, RcfResult};
37use crate::forest::RandomCutForest;
38use crate::thresholded::config::ThresholdedConfig;
39use crate::thresholded::grade::AnomalyGrade;
40use crate::thresholded::stats::EmaStats;
41
42/// Adaptive-threshold detector composed of a [`RandomCutForest`] plus
43/// a running EMA of the anomaly-score stream.
44///
45/// Instantiate via [`crate::ThresholdedForestBuilder`]. The type
46/// parameter `D` is the per-point dimensionality, pinned at compile
47/// time exactly like the bare [`RandomCutForest`].
48///
49/// # Examples
50///
51/// ```
52/// use anomstream_core::ThresholdedForestBuilder;
53///
54/// let mut detector = ThresholdedForestBuilder::<2>::new()
55///     .num_trees(50)
56///     .sample_size(64)
57///     .min_observations(4)
58///     .seed(42)
59///     .build()
60///     .unwrap();
61/// for i in 0..64 {
62///     let v = f64::from(i) * 0.01;
63///     let _ = detector.process([v, v + 0.5]).unwrap();
64/// }
65/// let verdict = detector.process([10.0, 10.0]).unwrap();
66/// assert!(verdict.ready());
67/// ```
68#[derive(Debug)]
69#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70pub struct ThresholdedForest<const D: usize> {
71    /// Underlying random cut forest.
72    forest: RandomCutForest<D>,
73    /// Threshold-layer configuration.
74    thresholded: ThresholdedConfig,
75    /// Running mean/variance of the per-point anomaly scores.
76    stats: EmaStats,
77    /// Streaming quantile estimator over the score stream. Only
78    /// consulted when `thresholded.threshold_mode` is
79    /// [`crate::thresholded::ThresholdMode::Quantile`]. Kept
80    /// populated under both modes so mode swaps at runtime (not
81    /// supported today, but persistence migrations tomorrow) do
82    /// not start from a cold digest.
83    tdigest: crate::TDigest,
84    /// Observability sink for threshold-layer events. Distinct from
85    /// the inner forest's sink so wrapper-only metrics
86    /// (`rcf_process_total`, `rcf_anomalies_fired_total`,
87    /// `rcf_threshold_current`, `rcf_grade`) do not duplicate the
88    /// forest's counters.
89    #[cfg(feature = "std")]
90    #[cfg_attr(
91        feature = "serde",
92        serde(skip, default = "crate::metrics::default_sink")
93    )]
94    metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
95}
96
97impl<const D: usize> ThresholdedForest<D> {
98    /// Low-level constructor used by [`crate::ThresholdedForestBuilder::build`].
99    ///
100    /// Both `forest` and `thresholded` are expected to have been
101    /// validated upstream; this function only wires them together and
102    /// constructs the EMA.
103    ///
104    /// # Errors
105    ///
106    /// Propagates [`EmaStats::new`] failures (non-finite decay etc.).
107    pub fn from_parts(
108        forest: RandomCutForest<D>,
109        thresholded: ThresholdedConfig,
110    ) -> RcfResult<Self> {
111        thresholded.validate()?;
112        let stats = EmaStats::new(thresholded.score_decay)?;
113        let tdigest = crate::TDigest::with_default_compression();
114        Ok(Self {
115            forest,
116            thresholded,
117            stats,
118            tdigest,
119            #[cfg(feature = "std")]
120            metrics: crate::metrics::default_sink(),
121        })
122    }
123
124    /// Install a [`crate::MetricsSink`] — every subsequent
125    /// `process` / `score_only` call emits counters and histograms
126    /// into it. Does **not** propagate to the underlying forest;
127    /// install on the forest separately if you also want low-level
128    /// `rcf_updates_total` / `rcf_score` / `rcf_deletes_total`
129    /// events.
130    #[cfg(feature = "std")]
131    #[must_use]
132    pub fn with_metrics_sink(
133        mut self,
134        sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
135    ) -> Self {
136        self.metrics = sink;
137        self
138    }
139
140    /// Read-only handle to the installed threshold-layer sink.
141    #[cfg(feature = "std")]
142    #[must_use]
143    pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
144        &self.metrics
145    }
146
147    /// Read-only access to the underlying forest.
148    #[must_use]
149    pub fn forest(&self) -> &RandomCutForest<D> {
150        &self.forest
151    }
152
153    /// Read-only access to the forest configuration.
154    #[must_use]
155    pub fn forest_config(&self) -> &RcfConfig {
156        self.forest.config()
157    }
158
159    /// Threshold-layer configuration.
160    #[must_use]
161    pub fn thresholded_config(&self) -> &ThresholdedConfig {
162        &self.thresholded
163    }
164
165    /// Running statistics of the anomaly-score stream.
166    #[must_use]
167    pub fn stats(&self) -> &EmaStats {
168        &self.stats
169    }
170
171    /// Current adaptive threshold. Clamped to the configured floor
172    /// whenever the detector has not yet accumulated enough
173    /// observations to trust the running statistic (stddev under
174    /// `ZSigma`, quantile under `Quantile`).
175    #[must_use]
176    pub fn current_threshold(&self) -> f64 {
177        use crate::thresholded::ThresholdMode;
178        if self.stats.observations() < self.thresholded.min_observations {
179            return self.thresholded.min_threshold;
180        }
181        let adaptive = match self.thresholded.threshold_mode {
182            ThresholdMode::ZSigma { z_factor } => {
183                if self.stats.stddev() <= 0.0 {
184                    return self.thresholded.min_threshold;
185                }
186                self.stats.mean() + z_factor * self.stats.stddev()
187            }
188            ThresholdMode::Quantile { p } => {
189                // Force a flush so the query sees every recorded
190                // score; `quantile` does the flush itself, but the
191                // `&self` borrow here forbids it — rely on the
192                // periodic flushes `flush_buffer` does on `record`.
193                match self.tdigest_quantile_readonly(p) {
194                    Some(q) => q,
195                    None => return self.thresholded.min_threshold,
196                }
197            }
198        };
199        adaptive.max(self.thresholded.min_threshold)
200    }
201
202    /// Immutable quantile lookup — equivalent to `TDigest::quantile`
203    /// but clones only the centroids + `min`/`max` so the detector
204    /// can stay `&self`. Cheap when the buffer is empty (expected
205    /// after any `record` call with `buffer_len > compression * 10`),
206    /// but callers on the hot-path should prefer letting the digest
207    /// flush itself.
208    fn tdigest_quantile_readonly(&self, p: f64) -> Option<f64> {
209        if self.tdigest.total_weight() <= 0.0 {
210            return None;
211        }
212        let mut scratch = self.tdigest.clone();
213        scratch.quantile(p)
214    }
215
216    /// Score `point` against the *current* forest, grade it against
217    /// the adaptive threshold, insert it into the forest, then fold
218    /// the score into the running statistics.
219    ///
220    /// The first call returns a warming-up verdict (`ready = false`,
221    /// `is_anomaly = false`) because the forest holds no leaves yet.
222    /// Subsequent calls within the `min_observations` warmup window
223    /// also return `ready = false`.
224    ///
225    /// # Errors
226    ///
227    /// - [`RcfError::NaNValue`] when the point contains a non-finite
228    ///   component.
229    /// - Any error bubbled up from [`RandomCutForest::update`] or
230    ///   [`RandomCutForest::score`].
231    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
232    pub fn process(&mut self, point: [f64; D]) -> RcfResult<AnomalyGrade> {
233        ensure_finite(&point)?;
234
235        // Score against the forest BEFORE the insert — a post-insert
236        // score would bias the result toward "seen" for the freshly
237        // inserted point and distort the threshold-driving statistics.
238        let score = match self.forest.score(&point) {
239            Ok(s) => s,
240            Err(RcfError::EmptyForest) => {
241                // Cold start: no leaves yet. Record the insert,
242                // emit a warming-up verdict, do not update stats.
243                self.forest.update(point)?;
244                let verdict = AnomalyGrade::new(
245                    AnomalyScore::new(0.0)?,
246                    self.thresholded.min_threshold,
247                    0.0,
248                    false,
249                    false,
250                )?;
251                #[cfg(feature = "std")]
252                self.emit_process_metrics(&verdict);
253                return Ok(verdict);
254            }
255            Err(other) => return Err(other),
256        };
257
258        self.forest.update(point)?;
259
260        let verdict = self.grade_from_score(score)?;
261        self.record_score(f64::from(score));
262        #[cfg(feature = "std")]
263        self.emit_process_metrics(&verdict);
264        Ok(verdict)
265    }
266
267    /// Score `point` and grade it without touching the forest or the
268    /// running statistics. Useful for re-evaluating a point against a
269    /// snapshot of the model without contaminating the training
270    /// stream.
271    ///
272    /// On an empty forest (no points have been inserted yet), returns
273    /// a warming-up verdict (`ready = false`, `is_anomaly = false`)
274    /// rather than an error — mirrors [`Self::process`]'s cold-start
275    /// handling so callers can query the detector during the warmup
276    /// window without special-casing the empty case.
277    ///
278    /// # Errors
279    ///
280    /// - [`RcfError::NaNValue`] when the point contains a non-finite
281    ///   component.
282    /// - Any other error bubbled up from [`RandomCutForest::score`].
283    pub fn score_only(&self, point: &[f64; D]) -> RcfResult<AnomalyGrade> {
284        match self.forest.score(point) {
285            Ok(score) => self.grade_from_score(score),
286            Err(RcfError::EmptyForest) => AnomalyGrade::new(
287                AnomalyScore::new(0.0)?,
288                self.thresholded.min_threshold,
289                0.0,
290                false,
291                false,
292            ),
293            Err(other) => Err(other),
294        }
295    }
296
297    /// Compute the per-feature attribution of `point`'s anomaly score
298    /// against the underlying forest. Forwarded to
299    /// [`RandomCutForest::attribution`]; the threshold layer has no
300    /// bearing on attribution.
301    ///
302    /// # Errors
303    ///
304    /// Same as [`RandomCutForest::attribution`].
305    pub fn attribution(&self, point: &[f64; D]) -> RcfResult<DiVector> {
306        self.forest.attribution(point)
307    }
308
309    /// Bulk-score a batch of points without touching the threshold
310    /// layer's stats. Returns [`AnomalyGrade`]s graded against the
311    /// current adaptive threshold — identical to what
312    /// [`Self::score_only`] would emit per point, but parallelised
313    /// across the batch via rayon when the `parallel` feature is
314    /// enabled.
315    ///
316    /// # Errors
317    ///
318    /// Propagates any [`Self::score_only`] error hit while
319    /// processing the batch.
320    pub fn score_only_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<AnomalyGrade>> {
321        #[cfg(feature = "parallel")]
322        {
323            use rayon::prelude::*;
324            points
325                .par_iter()
326                .map(|p| self.score_only(p))
327                .collect::<RcfResult<Vec<_>>>()
328        }
329        #[cfg(not(feature = "parallel"))]
330        {
331            points.iter().map(|p| self.score_only(p)).collect()
332        }
333    }
334
335    /// Bulk per-feature attribution. Delegates to
336    /// [`RandomCutForest::attribution_many`].
337    ///
338    /// # Errors
339    ///
340    /// Same as [`RandomCutForest::attribution_many`].
341    pub fn attribution_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
342        self.forest.attribution_many(points)
343    }
344
345    /// Imputation-like forensic baseline. Delegates to
346    /// [`RandomCutForest::forensic_baseline`].
347    ///
348    /// # Errors
349    ///
350    /// Same as [`RandomCutForest::forensic_baseline`].
351    pub fn forensic_baseline(
352        &self,
353        point: &[f64; D],
354    ) -> RcfResult<crate::forensic::ForensicBaseline<D>> {
355        self.forest.forensic_baseline(point)
356    }
357
358    /// Bulk early-termination scoring. Delegates to
359    /// [`RandomCutForest::score_many_early_term`] — the threshold
360    /// layer does not alter the scoring path.
361    ///
362    /// # Errors
363    ///
364    /// Same as [`RandomCutForest::score_many_early_term`].
365    pub fn score_many_early_term(
366        &self,
367        points: &[[f64; D]],
368        config: crate::early_term::EarlyTermConfig,
369    ) -> RcfResult<Vec<crate::early_term::EarlyTermScore>> {
370        self.forest.score_many_early_term(points, config)
371    }
372
373    /// Early-termination variant of the scoring path — delegates to
374    /// [`RandomCutForest::score_early_term`]. Does not update the
375    /// thresholded layer's stats (this is a read path, not a
376    /// training path).
377    ///
378    /// # Errors
379    ///
380    /// Same as [`RandomCutForest::score_early_term`].
381    pub fn score_early_term(
382        &self,
383        point: &[f64; D],
384        config: crate::early_term::EarlyTermConfig,
385    ) -> RcfResult<crate::early_term::EarlyTermScore> {
386        self.forest.score_early_term(point, config)
387    }
388
389    /// Drop every statistic and warm-up sample. The underlying forest
390    /// is left untouched — callers who want a full reset should
391    /// rebuild via the builder. Used by tests and by callers that
392    /// want to re-enter a warmup phase after a major regime change.
393    pub fn reset_stats(&mut self) {
394        self.stats.reset();
395        self.tdigest.reset();
396    }
397
398    /// Retract a previously-observed point from the underlying forest
399    /// by its `point_idx`. Delegates to
400    /// [`RandomCutForest::delete`] — the threshold layer's stats are
401    /// left untouched (they already reflect the score that was
402    /// emitted when the point was processed).
403    ///
404    /// # Errors
405    ///
406    /// Same as [`RandomCutForest::delete`].
407    pub fn delete(&mut self, point_idx: usize) -> RcfResult<bool> {
408        self.forest.delete(point_idx)
409    }
410
411    /// Retract every point whose stored value bit-matches `point`.
412    /// Delegates to [`RandomCutForest::delete_by_value`].
413    ///
414    /// # Errors
415    ///
416    /// Same as [`RandomCutForest::delete_by_value`].
417    pub fn delete_by_value(&mut self, point: &[f64; D]) -> RcfResult<usize> {
418        self.forest.delete_by_value(point)
419    }
420
421    /// Same as [`Self::process`] but returns the `point_idx` the
422    /// underlying forest assigned to the fresh observation, paired
423    /// with the usual graded verdict. Callers that want to later
424    /// retract the observation via [`Self::delete`] should store the
425    /// index from this call.
426    ///
427    /// # Errors
428    ///
429    /// Same as [`Self::process`].
430    pub fn process_indexed(&mut self, point: [f64; D]) -> RcfResult<(usize, AnomalyGrade)> {
431        ensure_finite(&point)?;
432
433        let score = match self.forest.score(&point) {
434            Ok(s) => s,
435            Err(RcfError::EmptyForest) => {
436                // Cold start: bypass the normal scoring path, mirror
437                // `process`'s warming-up verdict, and return the
438                // fresh point_idx from the underlying insert.
439                let idx = self.forest.update_indexed(point)?;
440                let grade = AnomalyGrade::new(
441                    AnomalyScore::new(0.0)?,
442                    self.thresholded.min_threshold,
443                    0.0,
444                    false,
445                    false,
446                )?;
447                #[cfg(feature = "std")]
448                self.emit_process_metrics(&grade);
449                return Ok((idx, grade));
450            }
451            Err(other) => return Err(other),
452        };
453
454        let idx = self.forest.update_indexed(point)?;
455        let verdict = self.grade_from_score(score)?;
456        self.record_score(f64::from(score));
457        #[cfg(feature = "std")]
458        self.emit_process_metrics(&verdict);
459        Ok((idx, verdict))
460    }
461
462    /// Timestamped variant of [`Self::process`] — tags the freshly
463    /// inserted point with `timestamp` so callers can later prune
464    /// history via [`RandomCutForest::delete_before`]. Returns the
465    /// same graded verdict as [`Self::process`].
466    ///
467    /// # Errors
468    ///
469    /// Same as [`Self::process`].
470    pub fn process_at(&mut self, point: [f64; D], timestamp: u64) -> RcfResult<AnomalyGrade> {
471        let (_, verdict) = self.process_indexed_at(point, timestamp)?;
472        Ok(verdict)
473    }
474
475    /// Timestamped variant of [`Self::process_indexed`] — records
476    /// the caller-supplied `timestamp` against the fresh `point_idx`.
477    ///
478    /// # Errors
479    ///
480    /// Same as [`Self::process_indexed`].
481    pub fn process_indexed_at(
482        &mut self,
483        point: [f64; D],
484        timestamp: u64,
485    ) -> RcfResult<(usize, AnomalyGrade)> {
486        let (idx, verdict) = self.process_indexed(point)?;
487        // process_indexed may have called update_indexed, so the
488        // side-map entry we tag here is attached to the correct
489        // fresh point_idx — even when the call path went through
490        // the cold-start warming-up branch.
491        if self.forest.point_store().ref_count(idx) > 0 {
492            self.forest.set_point_timestamp(idx, timestamp);
493        }
494        Ok((idx, verdict))
495    }
496
497    /// Retract every point whose timestamp is strictly less than
498    /// `cutoff`. Forwards to [`RandomCutForest::delete_before`].
499    ///
500    /// # Errors
501    ///
502    /// Propagates [`RandomCutForest::delete_before`] failures.
503    pub fn delete_before(&mut self, cutoff: u64) -> RcfResult<usize> {
504        self.forest.delete_before(cutoff)
505    }
506
507    /// Emit the counters / gauges / histograms associated with a
508    /// completed `process` call. Called once per public process
509    /// entry so cold-start warming-up verdicts are counted too.
510    #[cfg(feature = "std")]
511    fn emit_process_metrics(&self, verdict: &AnomalyGrade) {
512        use crate::metrics::names;
513        self.metrics.inc_counter(names::PROCESS_TOTAL, 1);
514        self.metrics
515            .observe_histogram(names::GRADE_OBSERVATION, verdict.grade());
516        if verdict.is_anomaly() {
517            self.metrics.inc_counter(names::ANOMALIES_FIRED_TOTAL, 1);
518        }
519        self.metrics
520            .set_gauge(names::THRESHOLD_CURRENT, self.current_threshold());
521        self.metrics.set_gauge(names::EMA_MEAN, self.stats.mean());
522        self.metrics
523            .set_gauge(names::EMA_STDDEV, self.stats.stddev());
524        #[allow(clippy::cast_precision_loss)]
525        self.metrics
526            .set_gauge(names::OBSERVATIONS_SEEN, self.stats.observations() as f64);
527    }
528
529    /// Translate a raw anomaly score into a graded verdict using the
530    /// current running statistics. Handles both
531    /// [`crate::thresholded::ThresholdMode`] variants.
532    fn grade_from_score(&self, score: AnomalyScore) -> RcfResult<AnomalyGrade> {
533        use crate::thresholded::ThresholdMode;
534        if self.stats.observations() < self.thresholded.min_observations {
535            return AnomalyGrade::new(score, self.thresholded.min_threshold, 0.0, false, false);
536        }
537
538        let raw = f64::from(score);
539        let (threshold, span) = match self.thresholded.threshold_mode {
540            ThresholdMode::ZSigma { z_factor } => {
541                let stddev = self.stats.stddev();
542                if stddev <= 0.0 {
543                    return AnomalyGrade::new(
544                        score,
545                        self.thresholded.min_threshold,
546                        0.0,
547                        false,
548                        false,
549                    );
550                }
551                let adaptive = self.stats.mean() + z_factor * stddev;
552                let t = adaptive.max(self.thresholded.min_threshold);
553                (t, z_factor * stddev)
554            }
555            ThresholdMode::Quantile { p } => {
556                let Some(q) = self.tdigest_quantile_readonly(p) else {
557                    return AnomalyGrade::new(
558                        score,
559                        self.thresholded.min_threshold,
560                        0.0,
561                        false,
562                        false,
563                    );
564                };
565                let t = q.max(self.thresholded.min_threshold);
566                // Grade span: distance from `p` to `max` on the
567                // empirical distribution. Heavy-tail-aware: a score
568                // at the observed maximum grades 1.0 whatever the
569                // absolute magnitude.
570                let max = self.tdigest.max().unwrap_or(t);
571                let sp = (max - t).max(f64::EPSILON);
572                (t, sp)
573            }
574        };
575
576        if raw <= threshold {
577            return AnomalyGrade::new(score, threshold, 0.0, false, true);
578        }
579
580        let grade = if span > 0.0 {
581            ((raw - threshold) / span).clamp(0.0, 1.0)
582        } else {
583            1.0
584        };
585        AnomalyGrade::new(score, threshold, grade, true, true)
586    }
587
588    /// Fold `score` into both the EMA (always) and the `TDigest`
589    /// (always — a mode swap should see a populated digest). Kept
590    /// private: every entry point that previously called
591    /// `self.stats.update(f64::from(score))` must route through
592    /// here instead.
593    fn record_score(&mut self, raw: f64) {
594        self.stats.update(raw);
595        self.tdigest.record(raw);
596    }
597}
598
599impl<const D: usize> crate::forest::ForestSnapshot for ThresholdedForest<D> {
600    fn snapshot_num_trees(&self) -> usize {
601        self.forest.num_trees()
602    }
603    fn snapshot_sample_size(&self) -> usize {
604        self.forest.sample_size()
605    }
606    fn snapshot_dimension(&self) -> usize {
607        self.forest.dimension()
608    }
609    fn snapshot_live_points(&self) -> usize {
610        self.forest.point_store().live_count()
611    }
612    fn snapshot_updates_seen(&self) -> u64 {
613        self.forest.updates_seen()
614    }
615    fn snapshot_memory_estimate(&self) -> usize {
616        self.forest.memory_estimate()
617    }
618}
619
620#[cfg(test)]
621#[allow(clippy::float_cmp)] // Tests assert bounds on closed-form quantities.
622mod tests {
623    use super::*;
624    use crate::thresholded::config::ThresholdedForestBuilder;
625
626    fn detector<const D: usize>(min_obs: u64) -> ThresholdedForest<D> {
627        ThresholdedForestBuilder::<D>::new()
628            .num_trees(50)
629            .sample_size(64)
630            .min_observations(min_obs)
631            .min_threshold(0.0)
632            .seed(42)
633            .build()
634            .unwrap()
635    }
636
637    #[test]
638    fn cold_start_emits_warming_up_verdict() {
639        let mut d = detector::<2>(8);
640        let v = d.process([0.1, 0.2]).unwrap();
641        assert!(!v.ready());
642        assert!(!v.is_anomaly());
643        assert_eq!(v.grade(), 0.0);
644    }
645
646    #[test]
647    fn warmup_period_always_not_ready() {
648        let mut d = detector::<2>(32);
649        for i in 0..20 {
650            let v = f64::from(i) * 0.01;
651            let verdict = d.process([v, v + 0.5]).unwrap();
652            assert!(!verdict.ready(), "should still be warming up at i={i}");
653        }
654    }
655
656    #[test]
657    fn becomes_ready_after_min_observations() {
658        let mut d = detector::<2>(8);
659        for i in 0..64 {
660            let v = f64::from(i) * 0.01;
661            d.process([v, v + 0.5]).unwrap();
662        }
663        // Probe an existing-ish point; observations > min_obs and
664        // stddev should be > 0 after 64 updates.
665        let verdict = d.process([0.64, 1.14]).unwrap();
666        assert!(verdict.ready());
667    }
668
669    #[test]
670    fn rejects_non_finite_point() {
671        let mut d = detector::<2>(8);
672        assert!(matches!(
673            d.process([f64::NAN, 0.0]).unwrap_err(),
674            RcfError::NaNValue
675        ));
676    }
677
678    #[test]
679    fn score_only_does_not_mutate_stats() {
680        let mut d = detector::<2>(4);
681        for i in 0..32 {
682            let v = f64::from(i) * 0.01;
683            d.process([v, v + 0.5]).unwrap();
684        }
685        let obs_before = d.stats().observations();
686        let _ = d.score_only(&[10.0, 10.0]).unwrap();
687        assert_eq!(d.stats().observations(), obs_before);
688    }
689
690    #[test]
691    fn outlier_grades_above_cluster_member() {
692        let mut d = detector::<2>(8);
693        for i in 0..128 {
694            let v = f64::from(i) * 0.01;
695            d.process([v, v + 0.5]).unwrap();
696        }
697        let cluster = d.score_only(&[0.3, 0.8]).unwrap();
698        let outlier = d.score_only(&[20.0, 20.0]).unwrap();
699        assert!(f64::from(outlier.score()) > f64::from(cluster.score()));
700    }
701
702    #[test]
703    fn current_threshold_respects_min_floor_during_warmup() {
704        let d = detector::<2>(16);
705        assert_eq!(d.current_threshold(), 0.0);
706    }
707
708    #[test]
709    fn quantile_threshold_mode_fires_on_tail_spike() {
710        use rand::{RngExt, SeedableRng};
711        let mut d = ThresholdedForestBuilder::<2>::new()
712            .num_trees(50)
713            .sample_size(64)
714            .min_observations(16)
715            .min_threshold(0.01)
716            .quantile_threshold(0.95)
717            .seed(19)
718            .build()
719            .unwrap();
720        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(29);
721        // Warm on in-distribution points.
722        for _ in 0..512 {
723            let a: f64 = rng.random();
724            let b: f64 = rng.random();
725            d.process([a, b]).unwrap();
726        }
727        let warm_threshold = d.current_threshold();
728        assert!(warm_threshold > 0.01, "threshold should lift off the floor");
729        // A heavy outlier should score above the p95 of the warm
730        // distribution → verdict flags `is_anomaly`.
731        let outlier = d.process([100.0, -100.0]).unwrap();
732        assert!(outlier.ready());
733        assert!(outlier.is_anomaly());
734    }
735
736    #[test]
737    fn quantile_threshold_rejects_invalid_p() {
738        let err = ThresholdedForestBuilder::<2>::new()
739            .num_trees(50)
740            .sample_size(64)
741            .quantile_threshold(1.5) // out of (0, 1)
742            .build()
743            .unwrap_err();
744        assert!(matches!(err, RcfError::InvalidConfig(_)));
745    }
746
747    #[test]
748    fn current_threshold_above_floor_when_stddev_positive() {
749        use rand::{RngExt, SeedableRng};
750        // Use a non-zero min_threshold and confirm the adaptive
751        // threshold rises above it once stats converge.
752        let mut d = ThresholdedForestBuilder::<2>::new()
753            .num_trees(50)
754            .sample_size(64)
755            .min_observations(8)
756            .min_threshold(0.01)
757            .seed(3)
758            .build()
759            .unwrap();
760        let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(17);
761        for _ in 0..256 {
762            let a: f64 = rng.random();
763            let b: f64 = rng.random();
764            d.process([a, b]).unwrap();
765        }
766        assert!(d.current_threshold() >= 0.01);
767    }
768
769    #[test]
770    fn attribution_forwards_to_forest() {
771        let mut d = detector::<2>(4);
772        for i in 0..32 {
773            let v = f64::from(i) * 0.01;
774            d.process([v, v + 0.5]).unwrap();
775        }
776        let di = d.attribution(&[10.0, 10.0]).unwrap();
777        assert_eq!(di.dim(), 2);
778    }
779
780    #[test]
781    fn reset_stats_sends_detector_back_to_warmup() {
782        let mut d = detector::<2>(4);
783        for i in 0..32 {
784            let v = f64::from(i) * 0.01;
785            d.process([v, v + 0.5]).unwrap();
786        }
787        assert!(d.stats().observations() > 0);
788        d.reset_stats();
789        assert_eq!(d.stats().observations(), 0);
790        // Next verdict should be warming-up again.
791        let v = d.process([0.5, 1.0]).unwrap();
792        assert!(!v.ready());
793    }
794
795    #[test]
796    fn accessors_expose_inner_state() {
797        let d = detector::<4>(8);
798        assert_eq!(d.forest().num_trees(), 50);
799        assert_eq!(d.forest_config().sample_size, 64);
800        assert_eq!(d.thresholded_config().min_observations, 8);
801        assert_eq!(d.stats().observations(), 0);
802    }
803}