use alloc::format;
use alloc::vec;
use alloc::vec::Vec;
#[cfg(not(feature = "std"))]
#[allow(unused_imports)]
use num_traits::Float;
use rand::{Rng, SeedableRng};
use rand_chacha::ChaCha8Rng;
use crate::config::RcfConfig;
use crate::domain::point::ensure_finite;
use crate::domain::{AnomalyScore, DiVector};
use crate::early_term::{EarlyTermConfig, EarlyTermScore};
use crate::error::{RcfError, RcfResult};
use crate::forest::point_store::PointStore;
use crate::sampler::{ReservoirSampler, SamplerOp};
use crate::tree::{PointAccessor, RandomCutTree};
use crate::visitor::{AttributionVisitor, ScalarScoreVisitor, ScoreAttributionVisitor};
type TreeSlot<const D: usize> = (RandomCutTree<D>, ReservoirSampler, ChaCha8Rng);
#[cfg(feature = "std")]
std::thread_local! {
static TRIMMED_SCRATCH: core::cell::RefCell<Vec<f64>> =
const { core::cell::RefCell::new(Vec::new()) };
}
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct RandomCutForest<const D: usize> {
config: RcfConfig,
trees: Vec<TreeSlot<D>>,
point_store: PointStore<D>,
updates_seen: u64,
#[cfg(feature = "parallel")]
#[cfg_attr(feature = "serde", serde(skip))]
pool: Option<std::sync::Arc<rayon::ThreadPool>>,
#[cfg(feature = "std")]
#[cfg_attr(
feature = "serde",
serde(skip, default = "crate::metrics::default_sink")
)]
metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
#[cfg_attr(feature = "serde", serde(default))]
timestamps: alloc::collections::BTreeMap<usize, u64>,
}
#[allow(clippy::missing_fields_in_debug)] impl<const D: usize> core::fmt::Debug for RandomCutForest<D> {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("RandomCutForest")
.field("D", &D)
.field("num_trees", &self.config.num_trees)
.field("sample_size", &self.config.sample_size)
.field("time_decay", &self.config.time_decay)
.field("updates_seen", &self.updates_seen)
.field("live_points", &self.point_store.live_count())
.finish()
}
}
impl<const D: usize> RandomCutForest<D> {
pub fn from_config(config: RcfConfig) -> RcfResult<Self> {
config.validate_feature_scales_dimension(D)?;
let mut master = if let Some(seed) = config.seed {
ChaCha8Rng::seed_from_u64(seed)
} else {
let mut bytes = [0_u8; 8];
getrandom::fill(&mut bytes).map_err(|e| {
RcfError::InvalidConfig(
format!("OS RNG unavailable for seed-less forest construction: {e}").into(),
)
})?;
ChaCha8Rng::seed_from_u64(u64::from_le_bytes(bytes))
};
let mut trees: Vec<TreeSlot<D>> = Vec::with_capacity(config.num_trees);
for _ in 0..config.num_trees {
let tree =
RandomCutTree::<D>::new(u32::try_from(config.sample_size).map_err(|_| {
RcfError::InvalidConfig(
format!("sample_size {} exceeds u32::MAX", config.sample_size).into(),
)
})?)?;
let sampler = ReservoirSampler::with_initial_accept_fraction(
config.sample_size,
config.time_decay,
config.initial_accept_fraction,
)?;
let tree_rng = ChaCha8Rng::seed_from_u64(master.next_u64());
trees.push((tree, sampler, tree_rng));
}
let point_store = PointStore::<D>::new()?;
#[cfg(feature = "parallel")]
let pool = match config.num_threads {
Some(n) => Some(std::sync::Arc::new(
rayon::ThreadPoolBuilder::new()
.num_threads(n)
.build()
.map_err(|e| {
RcfError::InvalidConfig(
format!("rayon ThreadPool build failed for num_threads={n}: {e}")
.into(),
)
})?,
)),
None => None,
};
Ok(Self {
config,
trees,
point_store,
updates_seen: 0,
#[cfg(feature = "parallel")]
pool,
#[cfg(feature = "std")]
metrics: crate::metrics::default_sink(),
timestamps: alloc::collections::BTreeMap::new(),
})
}
#[cfg(feature = "std")]
#[must_use]
pub fn with_metrics_sink(
mut self,
sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
) -> Self {
use crate::metrics::names;
#[allow(clippy::cast_precision_loss)]
sink.set_gauge(names::FOREST_TREES, self.trees.len() as f64);
self.metrics = sink;
self
}
#[cfg(feature = "std")]
#[must_use]
pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
&self.metrics
}
#[must_use]
pub fn config(&self) -> &RcfConfig {
&self.config
}
#[must_use]
pub(crate) fn scale_point_copy(&self, point: &[f64; D]) -> [f64; D] {
match &self.config.feature_scales {
Some(scales) if scales.len() == D => {
let mut out = *point;
for (p, s) in out.iter_mut().zip(scales.iter()) {
*p *= s;
}
out
}
_ => *point,
}
}
#[must_use]
pub fn num_trees(&self) -> usize {
self.trees.len()
}
#[must_use]
pub fn sample_size(&self) -> usize {
self.config.sample_size
}
#[must_use]
#[inline]
pub const fn dimension(&self) -> usize {
D
}
#[must_use]
pub fn updates_seen(&self) -> u64 {
self.updates_seen
}
#[must_use]
pub fn point_store(&self) -> &PointStore<D> {
&self.point_store
}
#[must_use]
pub fn trees(&self) -> &[TreeSlot<D>] {
&self.trees
}
#[inline]
#[cfg_attr(not(feature = "std"), allow(clippy::unused_self))]
fn ensure_finite_metered(&self, point: &[f64; D]) -> RcfResult<()> {
match ensure_finite(point) {
Ok(()) => Ok(()),
Err(e) => {
#[cfg(feature = "std")]
self.metrics
.inc_counter(crate::metrics::names::REJECTED_NAN_TOTAL, 1);
Err(e)
}
}
}
#[must_use]
pub fn memory_estimate(&self) -> usize {
let mut total = self.point_store.memory_estimate();
for (_, sampler, _) in &self.trees {
total += sampler.capacity() * 16;
total += core::mem::size_of::<ChaCha8Rng>();
}
total += self.trees.len() * (2 * self.config.sample_size * 48);
total
}
pub fn update(&mut self, point: [f64; D]) -> RcfResult<()> {
let scaled = self.scale_point_copy(&point);
self.insert_point(scaled)?;
Ok(())
}
pub fn update_indexed(&mut self, point: [f64; D]) -> RcfResult<usize> {
let scaled = self.scale_point_copy(&point);
self.insert_point(scaled)
}
pub fn update_at(&mut self, point: [f64; D], timestamp: u64) -> RcfResult<()> {
let idx = self.update_indexed_at(point, timestamp)?;
let _ = idx;
Ok(())
}
pub fn update_indexed_at(&mut self, point: [f64; D], timestamp: u64) -> RcfResult<usize> {
let idx = self.update_indexed(point)?;
if self.point_store.ref_count(idx) > 0 {
self.timestamps.insert(idx, timestamp);
}
Ok(idx)
}
pub fn delete_before(&mut self, cutoff: u64) -> RcfResult<usize> {
let victims: Vec<usize> = self
.timestamps
.iter()
.filter_map(|(idx, ts)| if *ts < cutoff { Some(*idx) } else { None })
.collect();
let mut removed = 0_usize;
for idx in victims {
if self.delete(idx)? {
removed = removed.saturating_add(1);
}
self.timestamps.remove(&idx);
}
Ok(removed)
}
#[must_use]
pub fn point_timestamp(&self, point_idx: usize) -> Option<u64> {
self.timestamps.get(&point_idx).copied()
}
#[must_use]
pub fn oldest_timestamp(&self) -> Option<u64> {
self.timestamps.values().min().copied()
}
#[must_use]
pub fn newest_timestamp(&self) -> Option<u64> {
self.timestamps.values().max().copied()
}
#[must_use]
pub fn tracked_timestamps(&self) -> usize {
self.timestamps.len()
}
pub fn set_point_timestamp(&mut self, point_idx: usize, timestamp: u64) {
self.timestamps.insert(point_idx, timestamp);
}
fn insert_point(&mut self, point: [f64; D]) -> RcfResult<usize> {
self.ensure_finite_metered(&point)?;
let new_idx = self.point_store.add(point)?;
#[cfg(feature = "parallel")]
let pool = self.pool.clone();
let Self {
trees,
point_store,
timestamps,
..
} = self;
let store: &PointStore<D> = point_store;
#[cfg(feature = "parallel")]
let pending_frees = if let Some(p) = pool.as_deref() {
p.install(|| update_trees(trees, store, new_idx))?
} else {
update_trees(trees, store, new_idx)?
};
#[cfg(not(feature = "parallel"))]
let pending_frees = update_trees(trees, store, new_idx)?;
for evicted in pending_frees {
point_store.set_free(evicted)?;
timestamps.remove(&evicted);
}
if point_store.ref_count(new_idx) == 0 {
point_store.drop_unreferenced(new_idx)?;
timestamps.remove(&new_idx);
}
self.updates_seen = self.updates_seen.saturating_add(1);
#[cfg(feature = "std")]
self.metrics
.inc_counter(crate::metrics::names::UPDATES_TOTAL, 1);
Ok(new_idx)
}
pub fn delete(&mut self, point_idx: usize) -> RcfResult<bool> {
#[cfg(feature = "parallel")]
let pool = self.pool.clone();
let Self {
trees,
point_store,
timestamps,
..
} = self;
let store: &PointStore<D> = point_store;
#[cfg(feature = "parallel")]
let (removed_from_any, went_to_zero) = if let Some(p) = pool.as_deref() {
p.install(|| delete_from_trees(trees, store, point_idx))?
} else {
delete_from_trees(trees, store, point_idx)?
};
#[cfg(not(feature = "parallel"))]
let (removed_from_any, went_to_zero) = delete_from_trees(trees, store, point_idx)?;
if went_to_zero {
point_store.set_free(point_idx)?;
timestamps.remove(&point_idx);
}
#[cfg(feature = "std")]
if removed_from_any {
self.metrics
.inc_counter(crate::metrics::names::DELETES_TOTAL, 1);
}
Ok(removed_from_any)
}
pub fn delete_by_value(&mut self, point: &[f64; D]) -> RcfResult<usize> {
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let probe: &[f64; D] = &scaled;
let capacity = self.point_store.capacity();
let mut seen = vec![false; capacity];
for (_, sampler, _) in &self.trees {
for idx in sampler.iter_indices() {
if idx < capacity {
seen[idx] = true;
}
}
}
let matching: Vec<usize> = seen
.iter()
.enumerate()
.filter_map(|(idx, hit)| {
if *hit && self.point_store.point(idx) == Some(probe) {
Some(idx)
} else {
None
}
})
.collect();
let mut removed = 0_usize;
for idx in matching {
if self.delete(idx)? {
removed = removed.saturating_add(1);
}
}
Ok(removed)
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_early_term(
&self,
point: &[f64; D],
config: EarlyTermConfig,
) -> RcfResult<EarlyTermScore> {
use crate::visitor::ScalarScoreVisitor;
self.ensure_finite_metered(point)?;
config.validate()?;
let scaled = self.scale_point_copy(point);
let probe: &[f64; D] = &scaled;
let mut mean = 0.0_f64;
let mut m2 = 0.0_f64;
let mut n: usize = 0;
let mut stderr = 0.0_f64;
let mut early = false;
let trees_available = self.trees.len();
for (tree, _, _) in &self.trees {
let Some(root) = tree.root() else {
continue;
};
let mass = tree.store().view(root)?.mass();
let visitor = ScalarScoreVisitor::new(mass);
let x: f64 = tree.traverse(probe, visitor)?.into();
n = n.saturating_add(1);
#[allow(clippy::cast_precision_loss)]
let n_f = n as f64;
let delta = x - mean;
mean += delta / n_f;
let delta2 = x - mean;
m2 += delta * delta2;
if n >= config.min_trees && n >= 2 {
#[allow(clippy::cast_precision_loss)]
let variance = m2 / (n - 1) as f64;
#[allow(clippy::cast_precision_loss)]
let stderr_now = (variance / n as f64).sqrt();
stderr = stderr_now;
let denom = mean.abs().max(f64::EPSILON);
if stderr_now / denom < config.confidence_threshold {
early = true;
break;
}
}
}
if n == 0 {
return Err(RcfError::EmptyForest);
}
let score = AnomalyScore::new(mean.max(0.0))?;
#[cfg(feature = "std")]
{
use crate::metrics::names;
self.metrics
.observe_histogram(names::SCORE_OBSERVATION, f64::from(score));
#[allow(clippy::cast_precision_loss)]
self.metrics
.observe_histogram(names::EARLY_TERM_TREES, n as f64);
if early {
self.metrics.inc_counter(names::EARLY_TERM_STOPPED_TOTAL, 1);
}
}
Ok(EarlyTermScore {
score,
trees_evaluated: n,
trees_available,
stderr,
early_stopped: early,
})
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score(&self, point: &[f64; D]) -> RcfResult<AnomalyScore> {
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let point = &scaled;
#[cfg(feature = "parallel")]
let (total, count) = if let Some(p) = self.pool.as_deref() {
p.install(|| score_aggregate(&self.trees, point))?
} else {
score_aggregate(&self.trees, point)?
};
#[cfg(not(feature = "parallel"))]
let (total, count) = score_aggregate(&self.trees, point)?;
if count == 0 {
return Err(RcfError::EmptyForest);
}
#[allow(clippy::cast_precision_loss)]
let mean = total / count as f64;
let score = AnomalyScore::new(mean.max(0.0))?;
#[cfg(feature = "std")]
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
Ok(score)
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_trimmed(&self, point: &[f64; D], trim_fraction: f64) -> RcfResult<AnomalyScore> {
if !(0.0..0.5).contains(&trim_fraction) || !trim_fraction.is_finite() {
return Err(RcfError::InvalidConfig(
format!("score_trimmed: trim_fraction must be in [0.0, 0.5), got {trim_fraction}")
.into(),
));
}
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let probe: &[f64; D] = &scaled;
#[cfg(feature = "std")]
let score = {
TRIMMED_SCRATCH.with(|cell| -> RcfResult<AnomalyScore> {
let mut samples = cell.borrow_mut();
samples.clear();
samples.reserve(self.trees.len());
self.collect_trimmed_samples(probe, &mut samples)?;
Self::reduce_trimmed(&mut samples, trim_fraction)
})?
};
#[cfg(not(feature = "std"))]
let score = {
let mut samples: Vec<f64> = Vec::with_capacity(self.trees.len());
self.collect_trimmed_samples(probe, &mut samples)?;
Self::reduce_trimmed(&mut samples, trim_fraction)?
};
#[cfg(feature = "std")]
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
Ok(score)
}
fn collect_trimmed_samples(&self, probe: &[f64; D], samples: &mut Vec<f64>) -> RcfResult<()> {
for (tree, _, _) in &self.trees {
let Some(root) = tree.root() else {
continue;
};
let mass = tree.store().view(root)?.mass();
let visitor = ScalarScoreVisitor::new(mass);
let s = tree.traverse(probe, visitor)?;
samples.push(f64::from(s));
}
Ok(())
}
fn reduce_trimmed(samples: &mut [f64], trim_fraction: f64) -> RcfResult<AnomalyScore> {
if samples.is_empty() {
return Err(RcfError::EmptyForest);
}
samples.sort_by(|a, b| a.partial_cmp(b).unwrap_or(core::cmp::Ordering::Equal));
#[allow(
clippy::cast_precision_loss,
clippy::cast_possible_truncation,
clippy::cast_sign_loss
)]
let trim_count = ((samples.len() as f64) * trim_fraction).floor() as usize;
let lo = trim_count;
let hi = samples.len().saturating_sub(trim_count);
if hi <= lo {
return Err(RcfError::InvalidConfig(
"score_trimmed: trim_fraction too large for tree count".into(),
));
}
let slice = &samples[lo..hi];
#[allow(clippy::cast_precision_loss)]
let mean = slice.iter().sum::<f64>() / slice.len() as f64;
AnomalyScore::new(mean.max(0.0))
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_codisp(&mut self, point: &[f64; D]) -> RcfResult<AnomalyScore> {
self.ensure_finite_metered(point)?;
let idx = self.update_indexed(*point)?;
let walk_result = codisp_walk_all_trees(&self.trees, idx);
let _ = self.delete(idx);
let (total, count) = walk_result?;
if count == 0 {
return Err(RcfError::EmptyForest);
}
#[allow(clippy::cast_precision_loss)]
let mean = total / count as f64;
let score = AnomalyScore::new(mean.max(0.0))?;
#[cfg(feature = "std")]
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
Ok(score)
}
pub fn score_codisp_stateless(&self, point: &[f64; D]) -> RcfResult<AnomalyScore> {
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let point = &scaled;
#[cfg(feature = "parallel")]
let (total, count) = if let Some(p) = self.pool.as_deref() {
p.install(|| codisp_stateless_aggregate(&self.trees, point))?
} else {
codisp_stateless_aggregate(&self.trees, point)?
};
#[cfg(not(feature = "parallel"))]
let (total, count) = codisp_stateless_aggregate(&self.trees, point)?;
if count == 0 {
return Err(RcfError::EmptyForest);
}
#[allow(clippy::cast_precision_loss)]
let mean = total / count as f64;
let score = AnomalyScore::new(mean.max(0.0))?;
#[cfg(feature = "std")]
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
Ok(score)
}
pub fn score_codisp_stateless_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<AnomalyScore>> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let run = || {
points
.par_iter()
.map(|p| self.score_codisp_stateless(p))
.collect::<RcfResult<Vec<_>>>()
};
if let Some(pool) = self.pool.as_deref() {
pool.install(run)
} else {
run()
}
}
#[cfg(not(feature = "parallel"))]
{
points
.iter()
.map(|p| self.score_codisp_stateless(p))
.collect()
}
}
#[cfg(feature = "std")]
pub fn score_codisp_many(&mut self, points: &[[f64; D]]) -> RcfResult<Vec<AnomalyScore>> {
if points.is_empty() {
return Ok(Vec::new());
}
for p in points {
self.ensure_finite_metered(p)?;
}
let mut probe_indices: Vec<usize> = Vec::with_capacity(points.len());
for p in points {
match self.update_indexed(*p) {
Ok(idx) => probe_indices.push(idx),
Err(e) => {
for idx in &probe_indices {
let _ = self.delete(*idx);
}
return Err(e);
}
}
}
let n = points.len();
let per_tree = codisp_many_walks_all_trees(&self.trees, &probe_indices, n);
let (totals, counts, walk_err) = match per_tree {
Ok((totals, counts)) => (totals, counts, None),
Err(e) => (vec![0.0_f64; n], vec![0_usize; n], Some(e)),
};
for idx in &probe_indices {
let _ = self.delete(*idx);
}
if let Some(e) = walk_err {
return Err(e);
}
let mut out = Vec::with_capacity(n);
for (total, count) in totals.into_iter().zip(counts) {
if count == 0 {
return Err(RcfError::EmptyForest);
}
#[allow(clippy::cast_precision_loss)]
let mean = total / count as f64;
let score = AnomalyScore::new(mean.max(0.0))?;
#[cfg(feature = "std")]
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
out.push(score);
}
Ok(out)
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_with_confidence(
&self,
point: &[f64; D],
) -> RcfResult<crate::score_ci::ScoreWithConfidence> {
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let probe: &[f64; D] = &scaled;
let mut samples: Vec<f64> = Vec::with_capacity(self.trees.len());
for (tree, _, _) in &self.trees {
let Some(root) = tree.root() else {
continue;
};
let mass = tree.store().view(root)?.mass();
let visitor = ScalarScoreVisitor::new(mass);
let s = tree.traverse(probe, visitor)?;
samples.push(f64::from(s));
}
if samples.is_empty() {
return Err(RcfError::EmptyForest);
}
let n = samples.len();
#[allow(clippy::cast_precision_loss)]
let n_f = n as f64;
let mean = samples.iter().sum::<f64>() / n_f;
let variance = if n > 1 {
let sq: f64 = samples.iter().map(|x| (x - mean).powi(2)).sum();
#[allow(clippy::cast_precision_loss)]
{
sq / (n - 1) as f64
}
} else {
0.0
};
let stddev = variance.sqrt();
let stderr = stddev / n_f.sqrt();
let score = AnomalyScore::new(mean.max(0.0))?;
#[cfg(feature = "std")]
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
Ok(crate::score_ci::ScoreWithConfidence {
score,
trees_evaluated: n,
stddev,
stderr,
})
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn attribution(&self, point: &[f64; D]) -> RcfResult<DiVector> {
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let point = &scaled;
#[cfg(feature = "parallel")]
let (mut accumulator, count) = if let Some(p) = self.pool.as_deref() {
p.install(|| attribution_aggregate::<D>(&self.trees, point))?
} else {
attribution_aggregate::<D>(&self.trees, point)?
};
#[cfg(not(feature = "parallel"))]
let (mut accumulator, count) = attribution_aggregate::<D>(&self.trees, point)?;
if count == 0 {
return Err(RcfError::EmptyForest);
}
#[allow(clippy::cast_precision_loss)]
let divisor = count as f64;
accumulator.scale(divisor)?;
#[cfg(feature = "std")]
self.metrics
.inc_counter(crate::metrics::names::ATTRIBUTION_TOTAL, 1);
Ok(accumulator)
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_and_attribution(&self, point: &[f64; D]) -> RcfResult<(AnomalyScore, DiVector)> {
self.ensure_finite_metered(point)?;
let scaled = self.scale_point_copy(point);
let point = &scaled;
#[cfg(feature = "parallel")]
let (total, mut accumulator, count) = if let Some(p) = self.pool.as_deref() {
p.install(|| score_attribution_aggregate::<D>(&self.trees, point))?
} else {
score_attribution_aggregate::<D>(&self.trees, point)?
};
#[cfg(not(feature = "parallel"))]
let (total, mut accumulator, count) = score_attribution_aggregate::<D>(&self.trees, point)?;
if count == 0 {
return Err(RcfError::EmptyForest);
}
#[allow(clippy::cast_precision_loss)]
let divisor = count as f64;
let mean = total / divisor;
let score = AnomalyScore::new(mean.max(0.0))?;
accumulator.scale(divisor)?;
#[cfg(feature = "std")]
{
self.metrics
.observe_histogram(crate::metrics::names::SCORE_OBSERVATION, f64::from(score));
self.metrics
.inc_counter(crate::metrics::names::ATTRIBUTION_TOTAL, 1);
}
Ok((score, accumulator))
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<AnomalyScore>> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let run = || {
points
.par_iter()
.map(|p| self.score(p))
.collect::<RcfResult<Vec<_>>>()
};
if let Some(pool) = self.pool.as_deref() {
pool.install(run)
} else {
run()
}
}
#[cfg(not(feature = "parallel"))]
{
points.iter().map(|p| self.score(p)).collect()
}
}
pub fn score_many_with<F>(&self, points: &[[f64; D]], mut on_score: F) -> RcfResult<()>
where
F: FnMut(usize, AnomalyScore),
{
for (i, p) in points.iter().enumerate() {
let s = self.score(p)?;
on_score(i, s);
}
Ok(())
}
#[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
pub fn score_many_early_term(
&self,
points: &[[f64; D]],
config: EarlyTermConfig,
) -> RcfResult<Vec<EarlyTermScore>> {
config.validate()?;
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let run = || {
points
.par_iter()
.map(|p| self.score_early_term(p, config))
.collect::<RcfResult<Vec<_>>>()
};
if let Some(pool) = self.pool.as_deref() {
pool.install(run)
} else {
run()
}
}
#[cfg(not(feature = "parallel"))]
{
points
.iter()
.map(|p| self.score_early_term(p, config))
.collect()
}
}
pub fn attribution_many(&self, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let run = || {
points
.par_iter()
.map(|p| self.attribution(p))
.collect::<RcfResult<Vec<_>>>()
};
if let Some(pool) = self.pool.as_deref() {
pool.install(run)
} else {
run()
}
}
#[cfg(not(feature = "parallel"))]
{
points.iter().map(|p| self.attribution(p)).collect()
}
}
pub fn forensic_baseline(
&self,
point: &[f64; D],
) -> RcfResult<crate::forensic::ForensicBaseline<D>> {
self.ensure_finite_metered(point)?;
let capacity = self.point_store.capacity();
let mut seen = vec![false; capacity];
let mut unique_count = 0_usize;
for (_, sampler, _) in &self.trees {
for idx in sampler.iter_indices() {
if idx < capacity && !seen[idx] {
seen[idx] = true;
unique_count = unique_count.saturating_add(1);
}
}
}
if unique_count == 0 {
return Err(RcfError::EmptyForest);
}
let mut n = 0_usize;
let mut mean_scaled = [0.0_f64; D];
let mut m2 = [0.0_f64; D];
for (idx, hit) in seen.iter().enumerate() {
if !*hit {
continue;
}
let Some(p_scaled) = self.point_store.point(idx) else {
continue;
};
n = n.saturating_add(1);
#[allow(clippy::cast_precision_loss)]
let n_f = n as f64;
for d in 0..D {
let delta = p_scaled[d] - mean_scaled[d];
mean_scaled[d] += delta / n_f;
let delta2 = p_scaled[d] - mean_scaled[d];
m2[d] += delta * delta2;
}
}
if n == 0 {
return Err(RcfError::EmptyForest);
}
let scales: Option<&Vec<f64>> = self.config.feature_scales.as_ref();
let mut expected = [0.0_f64; D];
let mut stddev = [0.0_f64; D];
for d in 0..D {
let scale_d = scales
.and_then(|v| v.get(d).copied())
.filter(|s| s.abs() > f64::EPSILON)
.unwrap_or(1.0);
expected[d] = mean_scaled[d] / scale_d;
#[allow(clippy::cast_precision_loss)]
let variance_scaled = if n >= 2 { m2[d] / (n - 1) as f64 } else { 0.0 };
stddev[d] = variance_scaled.sqrt() / scale_d.abs();
}
let observed = *point;
let mut delta_arr = [0.0_f64; D];
let mut zscore = [0.0_f64; D];
for d in 0..D {
delta_arr[d] = observed[d] - expected[d];
zscore[d] = if stddev[d] > 0.0 {
delta_arr[d] / stddev[d]
} else {
0.0
};
}
Ok(crate::forensic::ForensicBaseline {
observed,
expected,
stddev,
delta: delta_arr,
zscore,
live_points: n,
})
}
}
fn update_trees<const D: usize>(
trees: &mut [TreeSlot<D>],
store: &PointStore<D>,
new_idx: usize,
) -> RcfResult<Vec<usize>> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let chunk_size = trees.len().div_ceil(rayon::current_num_threads()).max(1);
let chunks: RcfResult<Vec<Vec<usize>>> = trees
.par_chunks_mut(chunk_size)
.map(|chunk| -> RcfResult<Vec<usize>> {
let mut local = Vec::new();
for slot in chunk {
let mut freed = process_tree_update(slot, store, new_idx)?;
local.append(&mut freed);
}
Ok(local)
})
.collect();
let mut flat = Vec::new();
for c in chunks? {
flat.extend(c);
}
Ok(flat)
}
#[cfg(not(feature = "parallel"))]
{
let mut out = Vec::new();
for slot in trees.iter_mut() {
let mut local = process_tree_update(slot, store, new_idx)?;
out.append(&mut local);
}
Ok(out)
}
}
fn delete_from_trees<const D: usize>(
trees: &mut [TreeSlot<D>],
store: &PointStore<D>,
point_idx: usize,
) -> RcfResult<(bool, bool)> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let chunk_size = trees.len().div_ceil(rayon::current_num_threads()).max(1);
let partials: RcfResult<Vec<(bool, bool)>> = trees
.par_chunks_mut(chunk_size)
.map(|chunk| -> RcfResult<(bool, bool)> {
let mut any = false;
let mut zero = false;
for slot in chunk {
let (a, z) = process_tree_delete(slot, store, point_idx)?;
any |= a;
zero |= z;
}
Ok((any, zero))
})
.collect();
let parts = partials?;
Ok(parts
.into_iter()
.fold((false, false), |(a1, z1), (a2, z2)| (a1 | a2, z1 | z2)))
}
#[cfg(not(feature = "parallel"))]
{
let mut any = false;
let mut zero = false;
for slot in trees.iter_mut() {
let (a, z) = process_tree_delete(slot, store, point_idx)?;
any |= a;
zero |= z;
}
Ok((any, zero))
}
}
fn process_tree_delete<const D: usize>(
slot: &mut TreeSlot<D>,
store: &PointStore<D>,
point_idx: usize,
) -> RcfResult<(bool, bool)> {
let (tree, sampler, _) = slot;
if sampler.remove(point_idx) {
tree.delete(point_idx, store)?;
let hit_zero = store.decr_ref(point_idx)?;
Ok((true, hit_zero))
} else {
Ok((false, false))
}
}
fn process_tree_update<const D: usize>(
slot: &mut TreeSlot<D>,
store: &PointStore<D>,
new_idx: usize,
) -> RcfResult<Vec<usize>> {
let (tree, sampler, rng) = slot;
let mut freed = Vec::new();
match sampler.accept(new_idx, rng) {
SamplerOp::Inserted => {
let p = store
.point(new_idx)
.expect("just-added point must be present");
tree.add(new_idx, p, store, rng)?;
store.incr_ref(new_idx)?;
}
SamplerOp::Replaced(evicted) => {
tree.delete(evicted, store)?;
if store.decr_ref(evicted)? {
freed.push(evicted);
}
let p = store
.point(new_idx)
.expect("just-added point must be present");
tree.add(new_idx, p, store, rng)?;
store.incr_ref(new_idx)?;
}
SamplerOp::Rejected => {}
}
Ok(freed)
}
#[cfg(feature = "std")]
fn codisp_many_walks_all_trees<const D: usize>(
trees: &[TreeSlot<D>],
probe_indices: &[usize],
n: usize,
) -> RcfResult<(Vec<f64>, Vec<usize>)> {
type PerTree = (Vec<f64>, Vec<usize>);
let per_tree_fn = |tree: &RandomCutTree<D>| -> RcfResult<PerTree> {
use alloc::collections::BTreeMap;
let mut totals = vec![0.0_f64; n];
let mut counts = vec![0_usize; n];
let mut leaf_cache: BTreeMap<crate::tree::NodeRef, f64> = BTreeMap::new();
for (i, &idx) in probe_indices.iter().enumerate() {
let Some(leaf) = tree.leaf_of(idx) else {
continue;
};
let codisp = if let Some(hit) = leaf_cache.get(&leaf) {
*hit
} else {
let c = walk_codisp(tree.store(), leaf)?;
leaf_cache.insert(leaf, c);
c
};
totals[i] += codisp;
counts[i] = counts[i].saturating_add(1);
}
Ok((totals, counts))
};
let reduce_pair = |mut a: PerTree, b: PerTree| -> PerTree {
for i in 0..n {
a.0[i] += b.0[i];
a.1[i] = a.1[i].saturating_add(b.1[i]);
}
a
};
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
trees
.par_iter()
.map(|(tree, _, _)| per_tree_fn(tree))
.try_reduce(
|| (vec![0.0_f64; n], vec![0_usize; n]),
|a, b| Ok(reduce_pair(a, b)),
)
}
#[cfg(not(feature = "parallel"))]
{
let mut acc: PerTree = (vec![0.0_f64; n], vec![0_usize; n]);
for (tree, _, _) in trees {
acc = reduce_pair(acc, per_tree_fn(tree)?);
}
Ok(acc)
}
}
fn codisp_walk_all_trees<const D: usize>(
trees: &[TreeSlot<D>],
idx: usize,
) -> RcfResult<(f64, usize)> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
trees
.par_iter()
.map(|(tree, _, _)| -> RcfResult<Option<f64>> {
let Some(leaf) = tree.leaf_of(idx) else {
return Ok(None);
};
walk_codisp(tree.store(), leaf).map(Some)
})
.try_fold(
|| (0.0_f64, 0_usize),
|(t, c), step| {
let s = step?;
Ok::<_, RcfError>(match s {
Some(v) => (t + v, c + 1),
None => (t, c),
})
},
)
.try_reduce(
|| (0.0_f64, 0_usize),
|(t1, c1), (t2, c2)| Ok((t1 + t2, c1 + c2)),
)
}
#[cfg(not(feature = "parallel"))]
{
let mut total = 0.0_f64;
let mut count = 0_usize;
for (tree, _, _) in trees {
let Some(leaf) = tree.leaf_of(idx) else {
continue;
};
total += walk_codisp(tree.store(), leaf)?;
count = count.saturating_add(1);
}
Ok((total, count))
}
}
fn codisp_stateless_aggregate<const D: usize>(
trees: &[TreeSlot<D>],
point: &[f64; D],
) -> RcfResult<(f64, usize)> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
trees
.par_iter()
.map(|(tree, _, _)| -> RcfResult<Option<f64>> {
if tree.root().is_none() {
return Ok(None);
}
let c = tree.codisp_stateless(point)?;
Ok(Some(c))
})
.try_fold(
|| (0.0_f64, 0_usize),
|(t, c), step| {
let s = step?;
Ok::<_, RcfError>(match s {
Some(v) => (t + v, c + 1),
None => (t, c),
})
},
)
.try_reduce(
|| (0.0_f64, 0_usize),
|(t1, c1), (t2, c2)| Ok((t1 + t2, c1 + c2)),
)
}
#[cfg(not(feature = "parallel"))]
{
let mut total = 0.0_f64;
let mut count = 0_usize;
for (tree, _, _) in trees {
if tree.root().is_none() {
continue;
}
total += tree.codisp_stateless(point)?;
count += 1;
}
Ok((total, count))
}
}
fn walk_codisp<const D: usize>(
store: &crate::tree::NodeStore<D>,
leaf: crate::tree::NodeRef,
) -> RcfResult<f64> {
use crate::tree::NodeView;
let mut cur = leaf;
let mut max_disp = 0.0_f64;
while let Some(parent_ref) = store.parent(cur)? {
let parent = store.internal(parent_ref)?;
let sibling_ref = if parent.left.raw() == cur.raw() {
parent.right
} else {
parent.left
};
let sibling_mass = match store.view(sibling_ref)? {
NodeView::Internal(i) => i.mass,
NodeView::Leaf(l) => l.mass,
};
let current_mass = match store.view(cur)? {
NodeView::Internal(i) => i.mass,
NodeView::Leaf(l) => l.mass,
};
if current_mass == 0 {
break;
}
#[allow(clippy::cast_precision_loss)]
let disp = sibling_mass as f64 / current_mass as f64;
if disp > max_disp {
max_disp = disp;
}
cur = parent_ref;
}
Ok(max_disp)
}
fn score_aggregate<const D: usize>(
trees: &[TreeSlot<D>],
point: &[f64; D],
) -> RcfResult<(f64, usize)> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
trees
.par_iter()
.map(|(tree, _, _)| -> RcfResult<Option<f64>> {
let Some(root) = tree.root() else {
return Ok(None);
};
let mass = tree.store().view(root)?.mass();
let visitor = ScalarScoreVisitor::new(mass);
let s = tree.traverse(point, visitor)?;
Ok(Some(f64::from(s)))
})
.try_fold(
|| (0.0_f64, 0_usize),
|(t, c), step| {
let s = step?;
Ok::<_, RcfError>(match s {
Some(v) => (t + v, c + 1),
None => (t, c),
})
},
)
.try_reduce(
|| (0.0_f64, 0_usize),
|(t1, c1), (t2, c2)| Ok((t1 + t2, c1 + c2)),
)
}
#[cfg(not(feature = "parallel"))]
{
let mut total = 0.0_f64;
let mut count = 0_usize;
for (tree, _, _) in trees {
let Some(root) = tree.root() else {
continue;
};
let mass = tree.store().view(root)?.mass();
let visitor = ScalarScoreVisitor::new(mass);
let s = tree.traverse(point, visitor)?;
total += f64::from(s);
count += 1;
}
Ok((total, count))
}
}
fn attribution_aggregate<const D: usize>(
trees: &[TreeSlot<D>],
point: &[f64; D],
) -> RcfResult<(DiVector, usize)> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
trees
.par_iter()
.map(|(tree, _, _)| -> RcfResult<Option<DiVector>> {
let Some(root) = tree.root() else {
return Ok(None);
};
let mass = tree.store().view(root)?.mass();
let visitor = AttributionVisitor::new(point, mass)?;
Ok(Some(tree.traverse(point, visitor)?))
})
.try_fold(
|| (DiVector::zeros(D), 0_usize),
|(mut acc, c), step| {
if let Some(di) = step? {
acc.accumulate(&di)?;
Ok::<_, RcfError>((acc, c + 1))
} else {
Ok((acc, c))
}
},
)
.try_reduce(
|| (DiVector::zeros(D), 0_usize),
|(mut a1, c1), (a2, c2)| {
a1.accumulate(&a2)?;
Ok((a1, c1 + c2))
},
)
}
#[cfg(not(feature = "parallel"))]
{
let mut accumulator = DiVector::zeros(D);
let mut count = 0_usize;
for (tree, _, _) in trees {
let Some(root) = tree.root() else {
continue;
};
let mass = tree.store().view(root)?.mass();
let visitor = AttributionVisitor::new(point, mass)?;
let di = tree.traverse(point, visitor)?;
accumulator.accumulate(&di)?;
count += 1;
}
Ok((accumulator, count))
}
}
fn score_attribution_aggregate<const D: usize>(
trees: &[TreeSlot<D>],
point: &[f64; D],
) -> RcfResult<(f64, DiVector, usize)> {
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
trees
.par_iter()
.map(|(tree, _, _)| -> RcfResult<Option<(f64, DiVector)>> {
let Some(root) = tree.root() else {
return Ok(None);
};
let mass = tree.store().view(root)?.mass();
let visitor = ScoreAttributionVisitor::new(point, mass)?;
let (s, di) = tree.traverse(point, visitor)?;
Ok(Some((f64::from(s), di)))
})
.try_fold(
|| (0.0_f64, DiVector::zeros(D), 0_usize),
|(t, mut acc, c), step| match step? {
Some((s, di)) => {
acc.accumulate(&di)?;
Ok::<_, RcfError>((t + s, acc, c + 1))
}
None => Ok((t, acc, c)),
},
)
.try_reduce(
|| (0.0_f64, DiVector::zeros(D), 0_usize),
|(t1, mut a1, c1), (t2, a2, c2)| {
a1.accumulate(&a2)?;
Ok((t1 + t2, a1, c1 + c2))
},
)
}
#[cfg(not(feature = "parallel"))]
{
let mut total = 0.0_f64;
let mut accumulator = DiVector::zeros(D);
let mut count = 0_usize;
for (tree, _, _) in trees {
let Some(root) = tree.root() else {
continue;
};
let mass = tree.store().view(root)?.mass();
let visitor = ScoreAttributionVisitor::new(point, mass)?;
let (s, di) = tree.traverse(point, visitor)?;
total += f64::from(s);
accumulator.accumulate(&di)?;
count += 1;
}
Ok((total, accumulator, count))
}
}
const _: fn() = || {
fn assert_send<T: Send>() {}
fn assert_sync<T: Sync>() {}
assert_send::<RandomCutForest<4>>();
assert_sync::<RandomCutForest<4>>();
assert_send::<PointStore<4>>();
assert_sync::<PointStore<4>>();
};
#[cfg(test)]
#[allow(clippy::float_cmp, clippy::cast_precision_loss, clippy::cast_lossless)] mod tests {
use super::*;
use crate::config::ForestBuilder;
use rand::RngExt;
fn small_forest() -> RandomCutForest<2> {
ForestBuilder::<2>::new()
.num_trees(50)
.sample_size(16)
.seed(2026)
.build()
.expect("AWS-conformant config")
}
#[test]
fn from_config_constructs_n_trees() {
let forest = small_forest();
assert_eq!(forest.num_trees(), 50);
assert_eq!(forest.trees().len(), 50);
assert_eq!(forest.sample_size(), 16);
assert_eq!(forest.dimension(), 2);
assert_eq!(forest.updates_seen(), 0);
}
#[cfg(feature = "parallel")]
#[test]
fn dedicated_thread_pool_runs_score_and_update() {
let mut f = ForestBuilder::<2>::new()
.num_trees(50)
.sample_size(16)
.seed(2026)
.num_threads(2)
.build()
.expect("custom pool builds");
for i in 0..100 {
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let score: f64 = f.score(&[5.0, 5.0]).unwrap().into();
assert!(score >= 0.0);
assert_eq!(f.config().num_threads, Some(2));
}
#[cfg(feature = "parallel")]
#[test]
fn dedicated_thread_pool_zero_threads_rejected_at_validate() {
let err = ForestBuilder::<2>::new()
.num_trees(50)
.sample_size(16)
.num_threads(0)
.build()
.unwrap_err();
assert!(matches!(err, RcfError::InvalidConfig(_)));
}
#[cfg(not(feature = "parallel"))]
#[test]
fn num_threads_field_inert_without_parallel_feature() {
let f = ForestBuilder::<2>::new()
.num_trees(50)
.sample_size(16)
.num_threads(4)
.build()
.expect("config with num_threads still validates without parallel feature");
assert_eq!(f.config().num_threads, Some(4));
}
#[test]
fn update_rejects_non_finite() {
let mut f = small_forest();
assert!(matches!(
f.update([1.0, f64::NAN]).unwrap_err(),
RcfError::NaNValue
));
}
#[test]
fn first_update_inserts_in_every_tree() {
let mut f = small_forest();
f.update([0.0, 0.0]).unwrap();
for (tree, sampler, _) in f.trees() {
assert!(tree.root().is_some());
assert_eq!(sampler.len(), 1);
}
assert_eq!(f.updates_seen(), 1);
}
#[test]
fn many_updates_keep_sample_size_bound() {
let mut f = small_forest();
for i in 0..500 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64;
f.update([v, v + 1.0]).unwrap();
}
for (tree, sampler, _) in f.trees() {
assert!(sampler.len() <= sampler.capacity());
assert!(tree.distinct_point_count() <= sampler.capacity());
}
}
#[test]
fn score_empty_forest_returns_err() {
let f = small_forest();
assert!(matches!(
f.score(&[1.0, 2.0]).unwrap_err(),
RcfError::EmptyForest
));
}
#[test]
fn score_returns_non_negative() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let score: f64 = f.score(&[0.5, 0.5]).unwrap().into();
assert!(score >= 0.0);
assert!(score.is_finite());
}
#[test]
fn score_with_confidence_matches_score_mean() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probe = [0.5, 0.5];
let plain: f64 = f.score(&probe).unwrap().into();
let ci = f.score_with_confidence(&probe).unwrap();
assert_eq!(f64::from(ci.score), plain);
assert!(ci.trees_evaluated > 0);
assert!(ci.stderr >= 0.0);
assert!(ci.stddev >= 0.0);
let (lo, hi) = ci.ci95();
assert!(lo <= plain);
assert!(hi >= plain);
}
#[test]
fn score_and_attribution_matches_split_calls() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probe = [5.0, -3.0]; let s_split: f64 = f.score(&probe).unwrap().into();
let di_split = f.attribution(&probe).unwrap();
let (s_merged, di_merged) = f.score_and_attribution(&probe).unwrap();
assert!((f64::from(s_merged) - s_split).abs() < 1e-9);
for d in 0..2 {
assert!((di_merged.high()[d] - di_split.high()[d]).abs() < 1e-9);
assert!((di_merged.low()[d] - di_split.low()[d]).abs() < 1e-9);
}
}
#[test]
fn score_trimmed_rejects_out_of_range_fraction() {
let mut f = small_forest();
for _ in 0..50 {
f.update([0.1, 0.2]).unwrap();
}
assert!(matches!(
f.score_trimmed(&[0.0, 0.0], -0.1).unwrap_err(),
RcfError::InvalidConfig(_)
));
assert!(matches!(
f.score_trimmed(&[0.0, 0.0], 0.5).unwrap_err(),
RcfError::InvalidConfig(_)
));
assert!(matches!(
f.score_trimmed(&[0.0, 0.0], f64::NAN).unwrap_err(),
RcfError::InvalidConfig(_)
));
}
#[test]
fn score_trimmed_zero_fraction_matches_score() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probe = [5.0, -3.0];
let plain: f64 = f.score(&probe).unwrap().into();
let trimmed: f64 = f.score_trimmed(&probe, 0.0).unwrap().into();
assert!((plain - trimmed).abs() < 1.0e-9);
}
#[test]
fn score_trimmed_drops_outlier_trees() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probe = [5.0, -3.0];
let plain: f64 = f.score(&probe).unwrap().into();
let trimmed: f64 = f.score_trimmed(&probe, 0.25).unwrap().into();
assert!(trimmed.is_finite());
assert!(trimmed >= 0.0);
assert!(trimmed <= plain.max(1.0) * 2.0);
}
#[test]
fn score_codisp_stateless_returns_non_negative_on_trained_forest() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let s: f64 = f.score_codisp_stateless(&[5.0, -3.0]).unwrap().into();
assert!(s.is_finite());
assert!(s >= 0.0);
}
#[test]
fn score_codisp_stateless_rejects_empty_forest() {
let f = small_forest();
assert!(matches!(
f.score_codisp_stateless(&[0.0, 0.0]).unwrap_err(),
RcfError::EmptyForest
));
}
#[test]
fn score_codisp_stateless_rejects_nan() {
let mut f = small_forest();
for _ in 0..50 {
f.update([0.1, 0.2]).unwrap();
}
assert!(matches!(
f.score_codisp_stateless(&[f64::NAN, 0.0]).unwrap_err(),
RcfError::NaNValue
));
}
#[test]
fn score_codisp_stateless_many_empty_input() {
let mut f = small_forest();
for _ in 0..50 {
f.update([0.1, 0.2]).unwrap();
}
let out = f.score_codisp_stateless_many(&[]).unwrap();
assert!(out.is_empty());
}
#[test]
fn score_codisp_stateless_many_matches_single_probe_loop() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probes = [[0.5, 1.0], [5.0, -3.0], [0.2, 0.7]];
let single: Vec<f64> = probes
.iter()
.map(|p| f64::from(f.score_codisp_stateless(p).unwrap()))
.collect();
let batched: Vec<f64> = f
.score_codisp_stateless_many(&probes)
.unwrap()
.into_iter()
.map(f64::from)
.collect();
assert_eq!(single.len(), batched.len());
for (a, b) in single.iter().zip(batched.iter()) {
assert!((a - b).abs() < 1e-12);
}
}
#[test]
fn score_codisp_stateless_many_handles_large_batch() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let n_probes = 10_000; let probes: Vec<[f64; 2]> = (0..n_probes)
.map(|i| {
#[allow(clippy::cast_precision_loss)]
let x = (i as f64) * 0.001;
[x, x + 0.5]
})
.collect();
let out = f.score_codisp_stateless_many(&probes).unwrap();
assert_eq!(out.len(), n_probes);
for s in &out {
let v: f64 = (*s).into();
assert!(v.is_finite() && v >= 0.0);
}
}
#[test]
fn score_codisp_stateless_does_not_drift_across_many_probes() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probe = [5.0, -3.0];
let first: f64 = f.score_codisp_stateless(&probe).unwrap().into();
for _ in 0..5_000 {
let _ = f.score_codisp_stateless(&probe).unwrap();
}
let last: f64 = f.score_codisp_stateless(&probe).unwrap().into();
assert!(
(first - last).abs() < 1e-12,
"stateless codisp drifted: first={first} last={last}"
);
}
#[test]
fn score_codisp_many_empty_input() {
let mut f = small_forest();
for _ in 0..50 {
f.update([0.1, 0.2]).unwrap();
}
let out = f.score_codisp_many(&[]).unwrap();
assert!(out.is_empty());
}
#[test]
fn score_codisp_many_returns_one_score_per_probe() {
let mut f = small_forest();
for i in 0..200 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v + 0.5]).unwrap();
}
let probes = [[0.5, 1.0], [5.0, -3.0], [0.2, 0.7]];
let scores = f.score_codisp_many(&probes).unwrap();
assert_eq!(scores.len(), 3);
for s in &scores {
let v: f64 = (*s).into();
assert!(v.is_finite() && v >= 0.0);
}
}
#[test]
fn score_codisp_many_rejects_nan() {
let mut f = small_forest();
for _ in 0..50 {
f.update([0.1, 0.2]).unwrap();
}
let err = f
.score_codisp_many(&[[0.0, 0.0], [f64::NAN, 0.0]])
.unwrap_err();
assert!(matches!(err, RcfError::NaNValue));
}
#[test]
fn score_and_attribution_rejects_empty_forest() {
let f = small_forest();
assert!(matches!(
f.score_and_attribution(&[0.0, 0.0]).unwrap_err(),
RcfError::EmptyForest
));
}
#[test]
fn score_and_attribution_rejects_nan() {
let mut f = small_forest();
for _ in 0..10 {
f.update([0.1, 0.2]).unwrap();
}
assert!(matches!(
f.score_and_attribution(&[f64::NAN, 0.0]).unwrap_err(),
RcfError::NaNValue
));
}
#[test]
fn score_with_confidence_rejects_empty_forest() {
let f = small_forest();
assert!(matches!(
f.score_with_confidence(&[0.0, 0.0]).unwrap_err(),
RcfError::EmptyForest
));
}
#[test]
fn score_with_confidence_rejects_nan_input() {
let mut f = small_forest();
for _ in 0..10 {
f.update([0.1, 0.2]).unwrap();
}
assert!(matches!(
f.score_with_confidence(&[f64::NAN, 0.0]).unwrap_err(),
RcfError::NaNValue
));
}
#[test]
fn outlier_scores_higher_than_cluster_member() {
let mut f = ForestBuilder::<2>::new()
.num_trees(50)
.sample_size(64)
.seed(7)
.build()
.unwrap();
let mut rng = ChaCha8Rng::seed_from_u64(99);
for _ in 0..200 {
let p = [rng.random::<f64>() * 0.1, rng.random::<f64>() * 0.1];
f.update(p).unwrap();
}
let cluster_score: f64 = f.score(&[0.05, 0.05]).unwrap().into();
let outlier_score: f64 = f.score(&[10.0, 10.0]).unwrap().into();
assert!(
outlier_score > cluster_score,
"outlier {outlier_score} not > cluster {cluster_score}"
);
}
#[test]
fn attribution_dim_matches_config() {
let mut f = ForestBuilder::<4>::new()
.num_trees(50)
.sample_size(32)
.seed(2026)
.build()
.unwrap();
for i in 0..100 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64 * 0.01;
f.update([v, v, v, v]).unwrap();
}
let di = f.attribution(&[0.5, 0.5, 0.5, 0.5]).unwrap();
assert_eq!(di.dim(), 4);
}
#[test]
fn attribution_empty_forest_returns_err() {
let f = small_forest();
assert!(matches!(
f.attribution(&[0.0, 0.0]).unwrap_err(),
RcfError::EmptyForest
));
}
#[test]
fn deterministic_under_fixed_seed() {
fn build_and_score(seed: u64) -> f64 {
let mut f = ForestBuilder::<2>::new()
.num_trees(50)
.sample_size(16)
.seed(seed)
.build()
.unwrap();
let mut rng = ChaCha8Rng::seed_from_u64(seed);
for _ in 0..100 {
f.update([rng.random::<f64>(), rng.random::<f64>()])
.unwrap();
}
f.score(&[5.0, 5.0]).unwrap().into()
}
let s1 = build_and_score(2026);
let s2 = build_and_score(2026);
assert_eq!(s1, s2);
}
#[test]
fn memory_estimate_within_4mb_at_default_config() {
let mut f = ForestBuilder::<16>::new().seed(1).build().unwrap();
for i in 0..(100 * 256) {
#[allow(clippy::cast_precision_loss)]
let v = i as f64;
f.update([v; 16]).unwrap();
}
let bytes = f.memory_estimate();
assert!(bytes < 8 * 1024 * 1024, "memory_estimate = {bytes}");
}
#[test]
fn point_store_capacity_stays_bounded() {
let mut f = small_forest();
for i in 0..1000 {
#[allow(clippy::cast_precision_loss)]
let v = i as f64;
f.update([v, v]).unwrap();
}
let live = f.point_store().live_count();
assert!(live <= f.num_trees() * f.sample_size());
}
}