anomstream_core/thresholded/detector.rs
1//! Adaptive-threshold wrapper around [`RandomCutForest`].
2//!
3//! [`ThresholdedForest`] composes a [`RandomCutForest`] with an EMA of
4//! the anomaly-score stream ([`EmaStats`]) and derives a continuously
5//! updated threshold:
6//!
7//! ```text
8//! threshold = max(min_threshold, mean + z_factor · stddev)
9//! is_anomaly = ready && score > threshold
10//! grade = clamp01( (score − threshold) / (z_factor · stddev) ) if ready
11//! | 0.0 otherwise
12//! ```
13//!
14//! where `ready` means the detector has seen at least
15//! `min_observations` points *and* the running stddev is strictly
16//! positive. The `ready` flag guards the cold-start period so callers
17//! never see spurious anomaly verdicts on the first few points (the
18//! EMA has not yet converged and the bootstrap variance is exactly
19//! zero).
20//!
21//! # Scoring protocol
22//!
23//! `process(point)` evaluates the point *before* inserting it into
24//! the forest. This avoids a self-referential bias where the freshly
25//! inserted point would be scored against a forest that already
26//! contains it — always shallow, always low-anomaly. The stats EMA
27//! is updated with the pre-insert score so the threshold adapts to
28//! the distribution of scores the forest would assign to unseen
29//! points.
30
31use alloc::vec::Vec;
32
33use crate::config::RcfConfig;
34use crate::domain::point::ensure_finite;
35use crate::domain::{AnomalyScore, DiVector};
36use crate::error::{RcfError, RcfResult};
37use crate::forest::RandomCutForest;
38use crate::thresholded::config::ThresholdedConfig;
39use crate::thresholded::grade::AnomalyGrade;
40use crate::thresholded::stats::EmaStats;
41
42/// Adaptive-threshold detector composed of a [`RandomCutForest`] plus
43/// a running EMA of the anomaly-score stream.
44///
45/// Instantiate via [`crate::ThresholdedForestBuilder`]. The type
46/// parameter `D` is the per-point dimensionality, pinned at compile
47/// time exactly like the bare [`RandomCutForest`].
48///
49/// # Examples
50///
51/// ```
52/// use anomstream_core::ThresholdedForestBuilder;
53///
54/// let mut detector = ThresholdedForestBuilder::<2>::new()
55/// .num_trees(50)
56/// .sample_size(64)
57/// .min_observations(4)
58/// .seed(42)
59/// .build()
60/// .unwrap();
61/// for i in 0..64 {
62/// let v = f64::from(i) * 0.01;
63/// let _ = detector.process([v, v + 0.5]).unwrap();
64/// }
65/// let verdict = detector.process([10.0, 10.0]).unwrap();
66/// assert!(verdict.ready());
67/// ```
68#[derive(Debug)]
69#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
70pub struct ThresholdedForest<const D: usize> {
71 /// Underlying random cut forest.
72 forest: RandomCutForest<D>,
73 /// Threshold-layer configuration.
74 thresholded: ThresholdedConfig,
75 /// Running mean/variance of the per-point anomaly scores.
76 stats: EmaStats,
77 /// Streaming quantile estimator over the score stream. Only
78 /// consulted when `thresholded.threshold_mode` is
79 /// [`crate::thresholded::ThresholdMode::Quantile`]. Kept
80 /// populated under both modes so mode swaps at runtime (not
81 /// supported today, but persistence migrations tomorrow) do
82 /// not start from a cold digest.
83 tdigest: crate::TDigest,
84 /// Observability sink for threshold-layer events. Distinct from
85 /// the inner forest's sink so wrapper-only metrics
86 /// (`rcf_process_total`, `rcf_anomalies_fired_total`,
87 /// `rcf_threshold_current`, `rcf_grade`) do not duplicate the
88 /// forest's counters.
89 #[cfg(feature = "std")]
90 #[cfg_attr(
91 feature = "serde",
92 serde(skip, default = "crate::metrics::default_sink")
93 )]
94 metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
95}
96
97impl<const D: usize> ThresholdedForest<D> {
98 /// Low-level constructor used by [`crate::ThresholdedForestBuilder::build`].
99 ///
100 /// Both `forest` and `thresholded` are expected to have been
101 /// validated upstream; this function only wires them together and
102 /// constructs the EMA.
103 ///
104 /// # Errors
105 ///
106 /// Propagates [`EmaStats::new`] failures (non-finite decay etc.).
107 pub fn from_parts(
108 forest: RandomCutForest<D>,
109 thresholded: ThresholdedConfig,
110 ) -> RcfResult<Self> {
111 thresholded.validate()?;
112 let stats = EmaStats::new(thresholded.score_decay)?;
113 let tdigest = crate::TDigest::with_default_compression();
114 Ok(Self {
115 forest,
116 thresholded,
117 stats,
118 tdigest,
119 #[cfg(feature = "std")]
120 metrics: crate::metrics::default_sink(),
121 })
122 }
123
124 /// Install a [`crate::MetricsSink`] — every subsequent
125 /// `process` / `score_only` call emits counters and histograms
126 /// into it. Does **not** propagate to the underlying forest;
127 /// install on the forest separately if you also want low-level
128 /// `rcf_updates_total` / `rcf_score` / `rcf_deletes_total`
129 /// events.
130 #[cfg(feature = "std")]
131 #[must_use]
132 pub fn with_metrics_sink(
133 mut self,
134 sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
135 ) -> Self {
136 self.metrics = sink;
137 self
138 }
139
140 /// Read-only handle to the installed threshold-layer sink.
141 #[cfg(feature = "std")]
142 #[must_use]
143 pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
144 &self.metrics
145 }
146
147 /// Read-only access to the underlying forest.
148 #[must_use]
149 pub fn forest(&self) -> &RandomCutForest<D> {
150 &self.forest
151 }
152
153 /// Read-only access to the forest configuration.
154 #[must_use]
155 pub fn forest_config(&self) -> &RcfConfig {
156 self.forest.config()
157 }
158
159 /// Threshold-layer configuration.
160 #[must_use]
161 pub fn thresholded_config(&self) -> &ThresholdedConfig {
162 &self.thresholded
163 }
164
165 /// Running statistics of the anomaly-score stream.
166 #[must_use]
167 pub fn stats(&self) -> &EmaStats {
168 &self.stats
169 }
170
171 /// Current adaptive threshold. Clamped to the configured floor
172 /// whenever the detector has not yet accumulated enough
173 /// observations to trust the running statistic (stddev under
174 /// `ZSigma`, quantile under `Quantile`).
175 #[must_use]
176 pub fn current_threshold(&self) -> f64 {
177 use crate::thresholded::ThresholdMode;
178 if self.stats.observations() < self.thresholded.min_observations {
179 return self.thresholded.min_threshold;
180 }
181 let adaptive = match self.thresholded.threshold_mode {
182 ThresholdMode::ZSigma { z_factor } => {
183 if self.stats.stddev() <= 0.0 {
184 return self.thresholded.min_threshold;
185 }
186 self.stats.mean() + z_factor * self.stats.stddev()
187 }
188 ThresholdMode::Quantile { p } => {
189 // Force a flush so the query sees every recorded
190 // score; `quantile` does the flush itself, but the
191 // `&self` borrow here forbids it — rely on the
192 // periodic flushes `flush_buffer` does on `record`.
193 match self.tdigest_quantile_readonly(p) {
194 Some(q) => q,
195 None => return self.thresholded.min_threshold,
196 }
197 }
198 };
199 adaptive.max(self.thresholded.min_threshold)
200 }
201
202 /// Immutable quantile lookup — equivalent to `TDigest::quantile`
203 /// but clones only the centroids + `min`/`max` so the detector
204 /// can stay `&self`. Cheap when the buffer is empty (expected
205 /// after any `record` call with `buffer_len > compression * 10`),
206 /// but callers on the hot-path should prefer letting the digest
207 /// flush itself.
208 fn tdigest_quantile_readonly(&self, p: f64) -> Option<f64> {
209 if self.tdigest.total_weight() <= 0.0 {
210 return None;
211 }
212 let mut scratch = self.tdigest.clone();
213 scratch.quantile(p)
214 }
215
216 /// Score `point` against the *current* forest, grade it against
217 /// the adaptive threshold, insert it into the forest, then fold
218 /// the score into the running statistics.
219 ///
220 /// The first call returns a warming-up verdict (`ready = false`,
221 /// `is_anomaly = false`) because the forest holds no leaves yet.
222 /// Subsequent calls within the `min_observations` warmup window
223 /// also return `ready = false`.
224 ///
225 /// # Errors
226 ///
227 /// - [`RcfError::NaNValue`] when the point contains a non-finite
228 /// component.
229 /// - Any error bubbled up from [`RandomCutForest::update`] or
230 /// [`RandomCutForest::score`].
231 #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
232 pub fn process(&mut self, point: [f64; D]) -> RcfResult<AnomalyGrade> {
233 ensure_finite(&point)?;
234
235 // Score against the forest BEFORE the insert — a post-insert
236 // score would bias the result toward "seen" for the freshly
237 // inserted point and distort the threshold-driving statistics.
238 let score = match self.forest.score(&point) {
239 Ok(s) => s,
240 Err(RcfError::EmptyForest) => {
241 // Cold start: no leaves yet. Record the insert,
242 // emit a warming-up verdict, do not update stats.
243 self.forest.update(point)?;
244 let verdict = AnomalyGrade::new(
245 AnomalyScore::new(0.0)?,
246 self.thresholded.min_threshold,
247 0.0,
248 false,
249 false,
250 )?;
251 #[cfg(feature = "std")]
252 self.emit_process_metrics(&verdict);
253 return Ok(verdict);
254 }
255 Err(other) => return Err(other),
256 };
257
258 self.forest.update(point)?;
259
260 let verdict = self.grade_from_score(score)?;
261 self.record_score(f64::from(score));
262 #[cfg(feature = "std")]
263 self.emit_process_metrics(&verdict);
264 Ok(verdict)
265 }
266
267 /// Score `point` and grade it without touching the forest or the
268 /// running statistics. Useful for re-evaluating a point against a
269 /// snapshot of the model without contaminating the training
270 /// stream.
271 ///
272 /// On an empty forest (no points have been inserted yet), returns
273 /// a warming-up verdict (`ready = false`, `is_anomaly = false`)
274 /// rather than an error — mirrors [`Self::process`]'s cold-start
275 /// handling so callers can query the detector during the warmup
276 /// window without special-casing the empty case.
277 ///
278 /// # Errors
279 ///
280 /// - [`RcfError::NaNValue`] when the point contains a non-finite
281 /// component.
282 /// - Any other error bubbled up from [`RandomCutForest::score`].
283 pub fn score_only(&self, point: &[f64; D]) -> RcfResult<AnomalyGrade> {
284 match self.forest.score(point) {
285 Ok(score) => self.grade_from_score(score),
286 Err(RcfError::EmptyForest) => AnomalyGrade::new(
287 AnomalyScore::new(0.0)?,
288 self.thresholded.min_threshold,
289 0.0,
290 false,
291 false,
292 ),
293 Err(other) => Err(other),
294 }
295 }
296
297 /// Compute the per-feature attribution of `point`'s anomaly score
298 /// against the underlying forest. Forwarded to
299 /// [`RandomCutForest::attribution`]; the threshold layer has no
300 /// bearing on attribution.
301 ///
302 /// # Errors
303 ///
304 /// Same as [`RandomCutForest::attribution`].
305 pub fn attribution(&self, point: &[f64; D]) -> RcfResult<DiVector> {
306 self.forest.attribution(point)
307 }
308
309 /// Bulk-score a batch of points without touching the threshold
310 /// layer's stats. Returns [`AnomalyGrade`]s graded against the
311 /// current adaptive threshold — identical to what
312 /// [`Self::score_only`] would emit per point, but parallelised
313 /// across the batch via rayon when the `parallel` feature is
314 /// enabled.
315 ///
316 /// # Errors
317 ///
318 /// Propagates any [`Self::score_only`] error hit while
319 /// processing the batch.
320 pub fn score_only_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<AnomalyGrade>> {
321 #[cfg(feature = "parallel")]
322 {
323 use rayon::prelude::*;
324 points
325 .par_iter()
326 .map(|p| self.score_only(p))
327 .collect::<RcfResult<Vec<_>>>()
328 }
329 #[cfg(not(feature = "parallel"))]
330 {
331 points.iter().map(|p| self.score_only(p)).collect()
332 }
333 }
334
335 /// Bulk per-feature attribution. Delegates to
336 /// [`RandomCutForest::attribution_many`].
337 ///
338 /// # Errors
339 ///
340 /// Same as [`RandomCutForest::attribution_many`].
341 pub fn attribution_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
342 self.forest.attribution_many(points)
343 }
344
345 /// Imputation-like forensic baseline. Delegates to
346 /// [`RandomCutForest::forensic_baseline`].
347 ///
348 /// # Errors
349 ///
350 /// Same as [`RandomCutForest::forensic_baseline`].
351 pub fn forensic_baseline(
352 &self,
353 point: &[f64; D],
354 ) -> RcfResult<crate::forensic::ForensicBaseline<D>> {
355 self.forest.forensic_baseline(point)
356 }
357
358 /// Bulk early-termination scoring. Delegates to
359 /// [`RandomCutForest::score_many_early_term`] — the threshold
360 /// layer does not alter the scoring path.
361 ///
362 /// # Errors
363 ///
364 /// Same as [`RandomCutForest::score_many_early_term`].
365 pub fn score_many_early_term(
366 &self,
367 points: &[[f64; D]],
368 config: crate::early_term::EarlyTermConfig,
369 ) -> RcfResult<Vec<crate::early_term::EarlyTermScore>> {
370 self.forest.score_many_early_term(points, config)
371 }
372
373 /// Early-termination variant of the scoring path — delegates to
374 /// [`RandomCutForest::score_early_term`]. Does not update the
375 /// thresholded layer's stats (this is a read path, not a
376 /// training path).
377 ///
378 /// # Errors
379 ///
380 /// Same as [`RandomCutForest::score_early_term`].
381 pub fn score_early_term(
382 &self,
383 point: &[f64; D],
384 config: crate::early_term::EarlyTermConfig,
385 ) -> RcfResult<crate::early_term::EarlyTermScore> {
386 self.forest.score_early_term(point, config)
387 }
388
389 /// Drop every statistic and warm-up sample. The underlying forest
390 /// is left untouched — callers who want a full reset should
391 /// rebuild via the builder. Used by tests and by callers that
392 /// want to re-enter a warmup phase after a major regime change.
393 pub fn reset_stats(&mut self) {
394 self.stats.reset();
395 self.tdigest.reset();
396 }
397
398 /// Retract a previously-observed point from the underlying forest
399 /// by its `point_idx`. Delegates to
400 /// [`RandomCutForest::delete`] — the threshold layer's stats are
401 /// left untouched (they already reflect the score that was
402 /// emitted when the point was processed).
403 ///
404 /// # Errors
405 ///
406 /// Same as [`RandomCutForest::delete`].
407 pub fn delete(&mut self, point_idx: usize) -> RcfResult<bool> {
408 self.forest.delete(point_idx)
409 }
410
411 /// Retract every point whose stored value bit-matches `point`.
412 /// Delegates to [`RandomCutForest::delete_by_value`].
413 ///
414 /// # Errors
415 ///
416 /// Same as [`RandomCutForest::delete_by_value`].
417 pub fn delete_by_value(&mut self, point: &[f64; D]) -> RcfResult<usize> {
418 self.forest.delete_by_value(point)
419 }
420
421 /// Same as [`Self::process`] but returns the `point_idx` the
422 /// underlying forest assigned to the fresh observation, paired
423 /// with the usual graded verdict. Callers that want to later
424 /// retract the observation via [`Self::delete`] should store the
425 /// index from this call.
426 ///
427 /// # Errors
428 ///
429 /// Same as [`Self::process`].
430 pub fn process_indexed(&mut self, point: [f64; D]) -> RcfResult<(usize, AnomalyGrade)> {
431 ensure_finite(&point)?;
432
433 let score = match self.forest.score(&point) {
434 Ok(s) => s,
435 Err(RcfError::EmptyForest) => {
436 // Cold start: bypass the normal scoring path, mirror
437 // `process`'s warming-up verdict, and return the
438 // fresh point_idx from the underlying insert.
439 let idx = self.forest.update_indexed(point)?;
440 let grade = AnomalyGrade::new(
441 AnomalyScore::new(0.0)?,
442 self.thresholded.min_threshold,
443 0.0,
444 false,
445 false,
446 )?;
447 #[cfg(feature = "std")]
448 self.emit_process_metrics(&grade);
449 return Ok((idx, grade));
450 }
451 Err(other) => return Err(other),
452 };
453
454 let idx = self.forest.update_indexed(point)?;
455 let verdict = self.grade_from_score(score)?;
456 self.record_score(f64::from(score));
457 #[cfg(feature = "std")]
458 self.emit_process_metrics(&verdict);
459 Ok((idx, verdict))
460 }
461
462 /// Timestamped variant of [`Self::process`] — tags the freshly
463 /// inserted point with `timestamp` so callers can later prune
464 /// history via [`RandomCutForest::delete_before`]. Returns the
465 /// same graded verdict as [`Self::process`].
466 ///
467 /// # Errors
468 ///
469 /// Same as [`Self::process`].
470 pub fn process_at(&mut self, point: [f64; D], timestamp: u64) -> RcfResult<AnomalyGrade> {
471 let (_, verdict) = self.process_indexed_at(point, timestamp)?;
472 Ok(verdict)
473 }
474
475 /// Timestamped variant of [`Self::process_indexed`] — records
476 /// the caller-supplied `timestamp` against the fresh `point_idx`.
477 ///
478 /// # Errors
479 ///
480 /// Same as [`Self::process_indexed`].
481 pub fn process_indexed_at(
482 &mut self,
483 point: [f64; D],
484 timestamp: u64,
485 ) -> RcfResult<(usize, AnomalyGrade)> {
486 let (idx, verdict) = self.process_indexed(point)?;
487 // process_indexed may have called update_indexed, so the
488 // side-map entry we tag here is attached to the correct
489 // fresh point_idx — even when the call path went through
490 // the cold-start warming-up branch.
491 if self.forest.point_store().ref_count(idx) > 0 {
492 self.forest.set_point_timestamp(idx, timestamp);
493 }
494 Ok((idx, verdict))
495 }
496
497 /// Retract every point whose timestamp is strictly less than
498 /// `cutoff`. Forwards to [`RandomCutForest::delete_before`].
499 ///
500 /// # Errors
501 ///
502 /// Propagates [`RandomCutForest::delete_before`] failures.
503 pub fn delete_before(&mut self, cutoff: u64) -> RcfResult<usize> {
504 self.forest.delete_before(cutoff)
505 }
506
507 /// Emit the counters / gauges / histograms associated with a
508 /// completed `process` call. Called once per public process
509 /// entry so cold-start warming-up verdicts are counted too.
510 #[cfg(feature = "std")]
511 fn emit_process_metrics(&self, verdict: &AnomalyGrade) {
512 use crate::metrics::names;
513 self.metrics.inc_counter(names::PROCESS_TOTAL, 1);
514 self.metrics
515 .observe_histogram(names::GRADE_OBSERVATION, verdict.grade());
516 if verdict.is_anomaly() {
517 self.metrics.inc_counter(names::ANOMALIES_FIRED_TOTAL, 1);
518 }
519 self.metrics
520 .set_gauge(names::THRESHOLD_CURRENT, self.current_threshold());
521 self.metrics.set_gauge(names::EMA_MEAN, self.stats.mean());
522 self.metrics
523 .set_gauge(names::EMA_STDDEV, self.stats.stddev());
524 #[allow(clippy::cast_precision_loss)]
525 self.metrics
526 .set_gauge(names::OBSERVATIONS_SEEN, self.stats.observations() as f64);
527 }
528
529 /// Translate a raw anomaly score into a graded verdict using the
530 /// current running statistics. Handles both
531 /// [`crate::thresholded::ThresholdMode`] variants.
532 fn grade_from_score(&self, score: AnomalyScore) -> RcfResult<AnomalyGrade> {
533 use crate::thresholded::ThresholdMode;
534 if self.stats.observations() < self.thresholded.min_observations {
535 return AnomalyGrade::new(score, self.thresholded.min_threshold, 0.0, false, false);
536 }
537
538 let raw = f64::from(score);
539 let (threshold, span) = match self.thresholded.threshold_mode {
540 ThresholdMode::ZSigma { z_factor } => {
541 let stddev = self.stats.stddev();
542 if stddev <= 0.0 {
543 return AnomalyGrade::new(
544 score,
545 self.thresholded.min_threshold,
546 0.0,
547 false,
548 false,
549 );
550 }
551 let adaptive = self.stats.mean() + z_factor * stddev;
552 let t = adaptive.max(self.thresholded.min_threshold);
553 (t, z_factor * stddev)
554 }
555 ThresholdMode::Quantile { p } => {
556 let Some(q) = self.tdigest_quantile_readonly(p) else {
557 return AnomalyGrade::new(
558 score,
559 self.thresholded.min_threshold,
560 0.0,
561 false,
562 false,
563 );
564 };
565 let t = q.max(self.thresholded.min_threshold);
566 // Grade span: distance from `p` to `max` on the
567 // empirical distribution. Heavy-tail-aware: a score
568 // at the observed maximum grades 1.0 whatever the
569 // absolute magnitude.
570 let max = self.tdigest.max().unwrap_or(t);
571 let sp = (max - t).max(f64::EPSILON);
572 (t, sp)
573 }
574 };
575
576 if raw <= threshold {
577 return AnomalyGrade::new(score, threshold, 0.0, false, true);
578 }
579
580 let grade = if span > 0.0 {
581 ((raw - threshold) / span).clamp(0.0, 1.0)
582 } else {
583 1.0
584 };
585 AnomalyGrade::new(score, threshold, grade, true, true)
586 }
587
588 /// Fold `score` into both the EMA (always) and the `TDigest`
589 /// (always — a mode swap should see a populated digest). Kept
590 /// private: every entry point that previously called
591 /// `self.stats.update(f64::from(score))` must route through
592 /// here instead.
593 fn record_score(&mut self, raw: f64) {
594 self.stats.update(raw);
595 self.tdigest.record(raw);
596 }
597}
598
599impl<const D: usize> crate::forest::ForestSnapshot for ThresholdedForest<D> {
600 fn snapshot_num_trees(&self) -> usize {
601 self.forest.num_trees()
602 }
603 fn snapshot_sample_size(&self) -> usize {
604 self.forest.sample_size()
605 }
606 fn snapshot_dimension(&self) -> usize {
607 self.forest.dimension()
608 }
609 fn snapshot_live_points(&self) -> usize {
610 self.forest.point_store().live_count()
611 }
612 fn snapshot_updates_seen(&self) -> u64 {
613 self.forest.updates_seen()
614 }
615 fn snapshot_memory_estimate(&self) -> usize {
616 self.forest.memory_estimate()
617 }
618}
619
620#[cfg(test)]
621#[allow(clippy::float_cmp)] // Tests assert bounds on closed-form quantities.
622mod tests {
623 use super::*;
624 use crate::thresholded::config::ThresholdedForestBuilder;
625
626 fn detector<const D: usize>(min_obs: u64) -> ThresholdedForest<D> {
627 ThresholdedForestBuilder::<D>::new()
628 .num_trees(50)
629 .sample_size(64)
630 .min_observations(min_obs)
631 .min_threshold(0.0)
632 .seed(42)
633 .build()
634 .unwrap()
635 }
636
637 #[test]
638 fn cold_start_emits_warming_up_verdict() {
639 let mut d = detector::<2>(8);
640 let v = d.process([0.1, 0.2]).unwrap();
641 assert!(!v.ready());
642 assert!(!v.is_anomaly());
643 assert_eq!(v.grade(), 0.0);
644 }
645
646 #[test]
647 fn warmup_period_always_not_ready() {
648 let mut d = detector::<2>(32);
649 for i in 0..20 {
650 let v = f64::from(i) * 0.01;
651 let verdict = d.process([v, v + 0.5]).unwrap();
652 assert!(!verdict.ready(), "should still be warming up at i={i}");
653 }
654 }
655
656 #[test]
657 fn becomes_ready_after_min_observations() {
658 let mut d = detector::<2>(8);
659 for i in 0..64 {
660 let v = f64::from(i) * 0.01;
661 d.process([v, v + 0.5]).unwrap();
662 }
663 // Probe an existing-ish point; observations > min_obs and
664 // stddev should be > 0 after 64 updates.
665 let verdict = d.process([0.64, 1.14]).unwrap();
666 assert!(verdict.ready());
667 }
668
669 #[test]
670 fn rejects_non_finite_point() {
671 let mut d = detector::<2>(8);
672 assert!(matches!(
673 d.process([f64::NAN, 0.0]).unwrap_err(),
674 RcfError::NaNValue
675 ));
676 }
677
678 #[test]
679 fn score_only_does_not_mutate_stats() {
680 let mut d = detector::<2>(4);
681 for i in 0..32 {
682 let v = f64::from(i) * 0.01;
683 d.process([v, v + 0.5]).unwrap();
684 }
685 let obs_before = d.stats().observations();
686 let _ = d.score_only(&[10.0, 10.0]).unwrap();
687 assert_eq!(d.stats().observations(), obs_before);
688 }
689
690 #[test]
691 fn outlier_grades_above_cluster_member() {
692 let mut d = detector::<2>(8);
693 for i in 0..128 {
694 let v = f64::from(i) * 0.01;
695 d.process([v, v + 0.5]).unwrap();
696 }
697 let cluster = d.score_only(&[0.3, 0.8]).unwrap();
698 let outlier = d.score_only(&[20.0, 20.0]).unwrap();
699 assert!(f64::from(outlier.score()) > f64::from(cluster.score()));
700 }
701
702 #[test]
703 fn current_threshold_respects_min_floor_during_warmup() {
704 let d = detector::<2>(16);
705 assert_eq!(d.current_threshold(), 0.0);
706 }
707
708 #[test]
709 fn quantile_threshold_mode_fires_on_tail_spike() {
710 use rand::{RngExt, SeedableRng};
711 let mut d = ThresholdedForestBuilder::<2>::new()
712 .num_trees(50)
713 .sample_size(64)
714 .min_observations(16)
715 .min_threshold(0.01)
716 .quantile_threshold(0.95)
717 .seed(19)
718 .build()
719 .unwrap();
720 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(29);
721 // Warm on in-distribution points.
722 for _ in 0..512 {
723 let a: f64 = rng.random();
724 let b: f64 = rng.random();
725 d.process([a, b]).unwrap();
726 }
727 let warm_threshold = d.current_threshold();
728 assert!(warm_threshold > 0.01, "threshold should lift off the floor");
729 // A heavy outlier should score above the p95 of the warm
730 // distribution → verdict flags `is_anomaly`.
731 let outlier = d.process([100.0, -100.0]).unwrap();
732 assert!(outlier.ready());
733 assert!(outlier.is_anomaly());
734 }
735
736 #[test]
737 fn quantile_threshold_rejects_invalid_p() {
738 let err = ThresholdedForestBuilder::<2>::new()
739 .num_trees(50)
740 .sample_size(64)
741 .quantile_threshold(1.5) // out of (0, 1)
742 .build()
743 .unwrap_err();
744 assert!(matches!(err, RcfError::InvalidConfig(_)));
745 }
746
747 #[test]
748 fn current_threshold_above_floor_when_stddev_positive() {
749 use rand::{RngExt, SeedableRng};
750 // Use a non-zero min_threshold and confirm the adaptive
751 // threshold rises above it once stats converge.
752 let mut d = ThresholdedForestBuilder::<2>::new()
753 .num_trees(50)
754 .sample_size(64)
755 .min_observations(8)
756 .min_threshold(0.01)
757 .seed(3)
758 .build()
759 .unwrap();
760 let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(17);
761 for _ in 0..256 {
762 let a: f64 = rng.random();
763 let b: f64 = rng.random();
764 d.process([a, b]).unwrap();
765 }
766 assert!(d.current_threshold() >= 0.01);
767 }
768
769 #[test]
770 fn attribution_forwards_to_forest() {
771 let mut d = detector::<2>(4);
772 for i in 0..32 {
773 let v = f64::from(i) * 0.01;
774 d.process([v, v + 0.5]).unwrap();
775 }
776 let di = d.attribution(&[10.0, 10.0]).unwrap();
777 assert_eq!(di.dim(), 2);
778 }
779
780 #[test]
781 fn reset_stats_sends_detector_back_to_warmup() {
782 let mut d = detector::<2>(4);
783 for i in 0..32 {
784 let v = f64::from(i) * 0.01;
785 d.process([v, v + 0.5]).unwrap();
786 }
787 assert!(d.stats().observations() > 0);
788 d.reset_stats();
789 assert_eq!(d.stats().observations(), 0);
790 // Next verdict should be warming-up again.
791 let v = d.process([0.5, 1.0]).unwrap();
792 assert!(!v.ready());
793 }
794
795 #[test]
796 fn accessors_expose_inner_state() {
797 let d = detector::<4>(8);
798 assert_eq!(d.forest().num_trees(), 50);
799 assert_eq!(d.forest_config().sample_size, 64);
800 assert_eq!(d.thresholded_config().min_observations, 8);
801 assert_eq!(d.stats().observations(), 0);
802 }
803}