use alloc::vec::Vec;
use crate::config::RcfConfig;
use crate::domain::point::ensure_finite;
use crate::domain::{AnomalyScore, DiVector};
use crate::error::{RcfError, RcfResult};
use crate::forest::RandomCutForest;
use crate::thresholded::config::ThresholdedConfig;
use crate::thresholded::grade::AnomalyGrade;
use crate::thresholded::stats::EmaStats;
#[derive(Debug)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ThresholdedForest<const D: usize> {
forest: RandomCutForest<D>,
thresholded: ThresholdedConfig,
stats: EmaStats,
tdigest: crate::TDigest,
#[cfg(feature = "std")]
#[cfg_attr(
feature = "serde",
serde(skip, default = "crate::metrics::default_sink")
)]
metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
}
impl<const D: usize> ThresholdedForest<D> {
pub fn from_parts(
forest: RandomCutForest<D>,
thresholded: ThresholdedConfig,
) -> RcfResult<Self> {
thresholded.validate()?;
let stats = EmaStats::new(thresholded.score_decay)?;
let tdigest = crate::TDigest::with_default_compression();
Ok(Self {
forest,
thresholded,
stats,
tdigest,
#[cfg(feature = "std")]
metrics: crate::metrics::default_sink(),
})
}
#[cfg(feature = "std")]
#[must_use]
pub fn with_metrics_sink(
mut self,
sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
) -> Self {
self.metrics = sink;
self
}
#[cfg(feature = "std")]
#[must_use]
pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
&self.metrics
}
#[must_use]
pub fn forest(&self) -> &RandomCutForest<D> {
&self.forest
}
#[must_use]
pub fn forest_config(&self) -> &RcfConfig {
self.forest.config()
}
#[must_use]
pub fn thresholded_config(&self) -> &ThresholdedConfig {
&self.thresholded
}
#[must_use]
pub fn stats(&self) -> &EmaStats {
&self.stats
}
#[must_use]
pub fn current_threshold(&self) -> f64 {
use crate::thresholded::ThresholdMode;
if self.stats.observations() < self.thresholded.min_observations {
return self.thresholded.min_threshold;
}
let adaptive = match self.thresholded.threshold_mode {
ThresholdMode::ZSigma { z_factor } => {
if self.stats.stddev() <= 0.0 {
return self.thresholded.min_threshold;
}
self.stats.mean() + z_factor * self.stats.stddev()
}
ThresholdMode::Quantile { p } => {
match self.tdigest_quantile_readonly(p) {
Some(q) => q,
None => return self.thresholded.min_threshold,
}
}
};
adaptive.max(self.thresholded.min_threshold)
}
fn tdigest_quantile_readonly(&self, p: f64) -> Option<f64> {
if self.tdigest.total_weight() <= 0.0 {
return None;
}
let mut scratch = self.tdigest.clone();
scratch.quantile(p)
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn process(&mut self, point: [f64; D]) -> RcfResult<AnomalyGrade> {
ensure_finite(&point)?;
let score = match self.forest.score(&point) {
Ok(s) => s,
Err(RcfError::EmptyForest) => {
self.forest.update(point)?;
let verdict = AnomalyGrade::new(
AnomalyScore::new(0.0)?,
self.thresholded.min_threshold,
0.0,
false,
false,
)?;
#[cfg(feature = "std")]
self.emit_process_metrics(&verdict);
return Ok(verdict);
}
Err(other) => return Err(other),
};
self.forest.update(point)?;
let verdict = self.grade_from_score(score)?;
self.record_score(f64::from(score));
#[cfg(feature = "std")]
self.emit_process_metrics(&verdict);
Ok(verdict)
}
pub fn score_only(&self, point: &[f64; D]) -> RcfResult<AnomalyGrade> {
match self.forest.score(point) {
Ok(score) => self.grade_from_score(score),
Err(RcfError::EmptyForest) => AnomalyGrade::new(
AnomalyScore::new(0.0)?,
self.thresholded.min_threshold,
0.0,
false,
false,
),
Err(other) => Err(other),
}
}
pub fn attribution(&self, point: &[f64; D]) -> RcfResult<DiVector> {
self.forest.attribution(point)
}
pub fn score_only_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<AnomalyGrade>> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
points
.par_iter()
.map(|p| self.score_only(p))
.collect::<RcfResult<Vec<_>>>()
}
#[cfg(not(feature = "parallel"))]
{
points.iter().map(|p| self.score_only(p)).collect()
}
}
pub fn attribution_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
self.forest.attribution_many(points)
}
pub fn forensic_baseline(
&self,
point: &[f64; D],
) -> RcfResult<crate::forensic::ForensicBaseline<D>> {
self.forest.forensic_baseline(point)
}
pub fn score_many_early_term(
&self,
points: &[[f64; D]],
config: crate::early_term::EarlyTermConfig,
) -> RcfResult<Vec<crate::early_term::EarlyTermScore>> {
self.forest.score_many_early_term(points, config)
}
pub fn score_early_term(
&self,
point: &[f64; D],
config: crate::early_term::EarlyTermConfig,
) -> RcfResult<crate::early_term::EarlyTermScore> {
self.forest.score_early_term(point, config)
}
pub fn reset_stats(&mut self) {
self.stats.reset();
self.tdigest.reset();
}
pub fn delete(&mut self, point_idx: usize) -> RcfResult<bool> {
self.forest.delete(point_idx)
}
pub fn delete_by_value(&mut self, point: &[f64; D]) -> RcfResult<usize> {
self.forest.delete_by_value(point)
}
pub fn process_indexed(&mut self, point: [f64; D]) -> RcfResult<(usize, AnomalyGrade)> {
ensure_finite(&point)?;
let score = match self.forest.score(&point) {
Ok(s) => s,
Err(RcfError::EmptyForest) => {
let idx = self.forest.update_indexed(point)?;
let grade = AnomalyGrade::new(
AnomalyScore::new(0.0)?,
self.thresholded.min_threshold,
0.0,
false,
false,
)?;
#[cfg(feature = "std")]
self.emit_process_metrics(&grade);
return Ok((idx, grade));
}
Err(other) => return Err(other),
};
let idx = self.forest.update_indexed(point)?;
let verdict = self.grade_from_score(score)?;
self.record_score(f64::from(score));
#[cfg(feature = "std")]
self.emit_process_metrics(&verdict);
Ok((idx, verdict))
}
pub fn process_at(&mut self, point: [f64; D], timestamp: u64) -> RcfResult<AnomalyGrade> {
let (_, verdict) = self.process_indexed_at(point, timestamp)?;
Ok(verdict)
}
pub fn process_indexed_at(
&mut self,
point: [f64; D],
timestamp: u64,
) -> RcfResult<(usize, AnomalyGrade)> {
let (idx, verdict) = self.process_indexed(point)?;
if self.forest.point_store().ref_count(idx) > 0 {
self.forest.set_point_timestamp(idx, timestamp);
}
Ok((idx, verdict))
}
pub fn delete_before(&mut self, cutoff: u64) -> RcfResult<usize> {
self.forest.delete_before(cutoff)
}
#[cfg(feature = "std")]
fn emit_process_metrics(&self, verdict: &AnomalyGrade) {
use crate::metrics::names;
self.metrics.inc_counter(names::PROCESS_TOTAL, 1);
self.metrics
.observe_histogram(names::GRADE_OBSERVATION, verdict.grade());
if verdict.is_anomaly() {
self.metrics.inc_counter(names::ANOMALIES_FIRED_TOTAL, 1);
}
self.metrics
.set_gauge(names::THRESHOLD_CURRENT, self.current_threshold());
self.metrics.set_gauge(names::EMA_MEAN, self.stats.mean());
self.metrics
.set_gauge(names::EMA_STDDEV, self.stats.stddev());
#[allow(clippy::cast_precision_loss)]
self.metrics
.set_gauge(names::OBSERVATIONS_SEEN, self.stats.observations() as f64);
}
fn grade_from_score(&self, score: AnomalyScore) -> RcfResult<AnomalyGrade> {
use crate::thresholded::ThresholdMode;
if self.stats.observations() < self.thresholded.min_observations {
return AnomalyGrade::new(score, self.thresholded.min_threshold, 0.0, false, false);
}
let raw = f64::from(score);
let (threshold, span) = match self.thresholded.threshold_mode {
ThresholdMode::ZSigma { z_factor } => {
let stddev = self.stats.stddev();
if stddev <= 0.0 {
return AnomalyGrade::new(
score,
self.thresholded.min_threshold,
0.0,
false,
false,
);
}
let adaptive = self.stats.mean() + z_factor * stddev;
let t = adaptive.max(self.thresholded.min_threshold);
(t, z_factor * stddev)
}
ThresholdMode::Quantile { p } => {
let Some(q) = self.tdigest_quantile_readonly(p) else {
return AnomalyGrade::new(
score,
self.thresholded.min_threshold,
0.0,
false,
false,
);
};
let t = q.max(self.thresholded.min_threshold);
let max = self.tdigest.max().unwrap_or(t);
let sp = (max - t).max(f64::EPSILON);
(t, sp)
}
};
if raw <= threshold {
return AnomalyGrade::new(score, threshold, 0.0, false, true);
}
let grade = if span > 0.0 {
((raw - threshold) / span).clamp(0.0, 1.0)
} else {
1.0
};
AnomalyGrade::new(score, threshold, grade, true, true)
}
fn record_score(&mut self, raw: f64) {
self.stats.update(raw);
self.tdigest.record(raw);
}
}
impl<const D: usize> crate::forest::ForestSnapshot for ThresholdedForest<D> {
fn snapshot_num_trees(&self) -> usize {
self.forest.num_trees()
}
fn snapshot_sample_size(&self) -> usize {
self.forest.sample_size()
}
fn snapshot_dimension(&self) -> usize {
self.forest.dimension()
}
fn snapshot_live_points(&self) -> usize {
self.forest.point_store().live_count()
}
fn snapshot_updates_seen(&self) -> u64 {
self.forest.updates_seen()
}
fn snapshot_memory_estimate(&self) -> usize {
self.forest.memory_estimate()
}
}
#[cfg(test)]
#[allow(clippy::float_cmp)] mod tests {
use super::*;
use crate::thresholded::config::ThresholdedForestBuilder;
fn detector<const D: usize>(min_obs: u64) -> ThresholdedForest<D> {
ThresholdedForestBuilder::<D>::new()
.num_trees(50)
.sample_size(64)
.min_observations(min_obs)
.min_threshold(0.0)
.seed(42)
.build()
.unwrap()
}
#[test]
fn cold_start_emits_warming_up_verdict() {
let mut d = detector::<2>(8);
let v = d.process([0.1, 0.2]).unwrap();
assert!(!v.ready());
assert!(!v.is_anomaly());
assert_eq!(v.grade(), 0.0);
}
#[test]
fn warmup_period_always_not_ready() {
let mut d = detector::<2>(32);
for i in 0..20 {
let v = f64::from(i) * 0.01;
let verdict = d.process([v, v + 0.5]).unwrap();
assert!(!verdict.ready(), "should still be warming up at i={i}");
}
}
#[test]
fn becomes_ready_after_min_observations() {
let mut d = detector::<2>(8);
for i in 0..64 {
let v = f64::from(i) * 0.01;
d.process([v, v + 0.5]).unwrap();
}
let verdict = d.process([0.64, 1.14]).unwrap();
assert!(verdict.ready());
}
#[test]
fn rejects_non_finite_point() {
let mut d = detector::<2>(8);
assert!(matches!(
d.process([f64::NAN, 0.0]).unwrap_err(),
RcfError::NaNValue
));
}
#[test]
fn score_only_does_not_mutate_stats() {
let mut d = detector::<2>(4);
for i in 0..32 {
let v = f64::from(i) * 0.01;
d.process([v, v + 0.5]).unwrap();
}
let obs_before = d.stats().observations();
let _ = d.score_only(&[10.0, 10.0]).unwrap();
assert_eq!(d.stats().observations(), obs_before);
}
#[test]
fn outlier_grades_above_cluster_member() {
let mut d = detector::<2>(8);
for i in 0..128 {
let v = f64::from(i) * 0.01;
d.process([v, v + 0.5]).unwrap();
}
let cluster = d.score_only(&[0.3, 0.8]).unwrap();
let outlier = d.score_only(&[20.0, 20.0]).unwrap();
assert!(f64::from(outlier.score()) > f64::from(cluster.score()));
}
#[test]
fn current_threshold_respects_min_floor_during_warmup() {
let d = detector::<2>(16);
assert_eq!(d.current_threshold(), 0.0);
}
#[test]
fn quantile_threshold_mode_fires_on_tail_spike() {
use rand::{RngExt, SeedableRng};
let mut d = ThresholdedForestBuilder::<2>::new()
.num_trees(50)
.sample_size(64)
.min_observations(16)
.min_threshold(0.01)
.quantile_threshold(0.95)
.seed(19)
.build()
.unwrap();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(29);
for _ in 0..512 {
let a: f64 = rng.random();
let b: f64 = rng.random();
d.process([a, b]).unwrap();
}
let warm_threshold = d.current_threshold();
assert!(warm_threshold > 0.01, "threshold should lift off the floor");
let outlier = d.process([100.0, -100.0]).unwrap();
assert!(outlier.ready());
assert!(outlier.is_anomaly());
}
#[test]
fn quantile_threshold_rejects_invalid_p() {
let err = ThresholdedForestBuilder::<2>::new()
.num_trees(50)
.sample_size(64)
.quantile_threshold(1.5) .build()
.unwrap_err();
assert!(matches!(err, RcfError::InvalidConfig(_)));
}
#[test]
fn current_threshold_above_floor_when_stddev_positive() {
use rand::{RngExt, SeedableRng};
let mut d = ThresholdedForestBuilder::<2>::new()
.num_trees(50)
.sample_size(64)
.min_observations(8)
.min_threshold(0.01)
.seed(3)
.build()
.unwrap();
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(17);
for _ in 0..256 {
let a: f64 = rng.random();
let b: f64 = rng.random();
d.process([a, b]).unwrap();
}
assert!(d.current_threshold() >= 0.01);
}
#[test]
fn attribution_forwards_to_forest() {
let mut d = detector::<2>(4);
for i in 0..32 {
let v = f64::from(i) * 0.01;
d.process([v, v + 0.5]).unwrap();
}
let di = d.attribution(&[10.0, 10.0]).unwrap();
assert_eq!(di.dim(), 2);
}
#[test]
fn reset_stats_sends_detector_back_to_warmup() {
let mut d = detector::<2>(4);
for i in 0..32 {
let v = f64::from(i) * 0.01;
d.process([v, v + 0.5]).unwrap();
}
assert!(d.stats().observations() > 0);
d.reset_stats();
assert_eq!(d.stats().observations(), 0);
let v = d.process([0.5, 1.0]).unwrap();
assert!(!v.ready());
}
#[test]
fn accessors_expose_inner_state() {
let d = detector::<4>(8);
assert_eq!(d.forest().num_trees(), 50);
assert_eq!(d.forest_config().sample_size, 64);
assert_eq!(d.thresholded_config().min_observations, 8);
assert_eq!(d.stats().observations(), 0);
}
}