use core::cmp::Ordering;
use core::hash::Hash;
use std::collections::{BinaryHeap, HashMap};
use std::time::{Duration, Instant};
use crate::bootstrap::BootstrapReport;
use crate::domain::DiVector;
use crate::error::{RcfError, RcfResult};
use crate::thresholded::{AnomalyGrade, ThresholdedForest};
type ForestFactory<const D: usize> = dyn Fn() -> RcfResult<ThresholdedForest<D>>;
#[derive(Debug)]
struct TenantSlot<const D: usize> {
forest: Box<ThresholdedForest<D>>,
last_access: u64,
last_access_instant: Instant,
}
struct MostSimilarHeapEntry<K: Clone> {
sim: f64,
key: K,
}
impl<K: Clone> PartialEq for MostSimilarHeapEntry<K> {
fn eq(&self, other: &Self) -> bool {
self.sim == other.sim
}
}
impl<K: Clone> Eq for MostSimilarHeapEntry<K> {}
impl<K: Clone> PartialOrd for MostSimilarHeapEntry<K> {
fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct ReadinessSummary {
pub resident: usize,
pub warming: usize,
pub ready: usize,
pub capacity: usize,
pub tenants_created_lifetime: u64,
pub tenants_evicted_lifetime: u64,
}
impl ReadinessSummary {
#[must_use]
pub fn readiness_ratio(&self) -> f64 {
if self.resident == 0 {
#[allow(clippy::cast_precision_loss)]
return f64::NAN;
}
#[allow(clippy::cast_precision_loss)]
{
self.ready as f64 / self.resident as f64
}
}
#[must_use]
pub fn is_fully_ready(&self) -> bool {
self.warming == 0
}
#[must_use]
pub fn is_at_capacity(&self) -> bool {
self.resident >= self.capacity
}
}
impl<K: Clone> Ord for MostSimilarHeapEntry<K> {
fn cmp(&self, other: &Self) -> core::cmp::Ordering {
other
.sim
.partial_cmp(&self.sim)
.unwrap_or(core::cmp::Ordering::Equal)
}
}
pub struct TenantForestPool<K, const D: usize>
where
K: Hash + Eq + Clone,
{
forests: HashMap<K, TenantSlot<D>>,
capacity: usize,
access_counter: u64,
tenants_created_lifetime: u64,
tenants_evicted_lifetime: u64,
factory: Box<ForestFactory<D>>,
metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
}
impl<K, const D: usize> core::fmt::Debug for TenantForestPool<K, D>
where
K: Hash + Eq + Clone + core::fmt::Debug,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("TenantForestPool")
.field("capacity", &self.capacity)
.field("len", &self.forests.len())
.field("access_counter", &self.access_counter)
.field("tenants_created_lifetime", &self.tenants_created_lifetime)
.field("tenants_evicted_lifetime", &self.tenants_evicted_lifetime)
.field("tenants", &self.forests.keys().collect::<Vec<_>>())
.field("factory", &"<dyn Fn>")
.field("metrics", &self.metrics)
.finish()
}
}
impl<K, const D: usize> TenantForestPool<K, D>
where
K: Hash + Eq + Clone,
{
pub fn new<F>(capacity: usize, factory: F) -> RcfResult<Self>
where
F: Fn() -> RcfResult<ThresholdedForest<D>> + 'static,
{
if capacity == 0 {
return Err(RcfError::InvalidConfig(
"TenantForestPool capacity must be > 0".into(),
));
}
Ok(Self {
forests: HashMap::with_capacity(capacity),
capacity,
access_counter: 0,
tenants_created_lifetime: 0,
tenants_evicted_lifetime: 0,
factory: Box::new(factory),
metrics: crate::metrics::default_sink(),
})
}
#[must_use]
pub fn with_metrics_sink(
mut self,
sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
) -> Self {
#[allow(clippy::cast_precision_loss)]
sink.set_gauge(
crate::metrics::names::TENANTS_RESIDENT,
self.forests.len() as f64,
);
#[allow(clippy::cast_precision_loss)]
sink.set_gauge(crate::metrics::names::TENANT_CAPACITY, self.capacity as f64);
self.metrics = sink;
self
}
#[must_use]
pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
&self.metrics
}
fn emit_resident_gauge(&self) {
#[allow(clippy::cast_precision_loss)]
self.metrics.set_gauge(
crate::metrics::names::TENANTS_RESIDENT,
self.forests.len() as f64,
);
}
#[must_use]
pub fn capacity(&self) -> usize {
self.capacity
}
#[must_use]
pub fn len(&self) -> usize {
self.forests.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.forests.is_empty()
}
#[must_use]
pub fn contains(&self, key: &K) -> bool {
self.forests.contains_key(key)
}
#[must_use]
pub fn peek(&self, key: &K) -> Option<&ThresholdedForest<D>> {
self.forests.get(key).map(|slot| slot.forest.as_ref())
}
pub fn get(&mut self, key: &K) -> Option<&ThresholdedForest<D>> {
let tick = self.bump_access();
let now = Instant::now();
self.forests.get_mut(key).map(|slot| {
slot.last_access = tick;
slot.last_access_instant = now;
&*slot.forest
})
}
pub fn get_mut(&mut self, key: &K) -> Option<&mut ThresholdedForest<D>> {
let tick = self.bump_access();
let now = Instant::now();
self.forests.get_mut(key).map(|slot| {
slot.last_access = tick;
slot.last_access_instant = now;
slot.forest.as_mut()
})
}
pub fn process(&mut self, key: &K, point: [f64; D]) -> RcfResult<AnomalyGrade> {
self.touch_or_create(key)?.process(point)
}
pub fn score_only(&mut self, key: &K, point: &[f64; D]) -> RcfResult<AnomalyGrade> {
self.touch_or_create(key)?.score_only(point)
}
pub fn attribution(&mut self, key: &K, point: &[f64; D]) -> RcfResult<DiVector> {
self.touch_or_create(key)?.attribution(point)
}
pub fn score_only_many(
&mut self,
key: &K,
points: &[[f64; D]],
) -> RcfResult<Option<Vec<AnomalyGrade>>> {
match self.get_mut(key) {
Some(detector) => Ok(Some(detector.score_only_many(points)?)),
None => Ok(None),
}
}
pub fn score_many_early_term(
&mut self,
key: &K,
points: &[[f64; D]],
config: crate::early_term::EarlyTermConfig,
) -> RcfResult<Vec<crate::early_term::EarlyTermScore>> {
self.touch_or_create(key)?
.score_many_early_term(points, config)
}
pub fn score_across_tenants(&self, point: &[f64; D]) -> RcfResult<Vec<(K, AnomalyGrade)>>
where
K: Send + Sync,
{
#[cfg(feature = "parallel")]
let collected: RcfResult<Vec<Option<(K, AnomalyGrade)>>> = {
use rayon::prelude::*;
let entries: Vec<(&K, &ThresholdedForest<D>)> = self
.forests
.iter()
.map(|(k, slot)| (k, slot.forest.as_ref()))
.collect();
entries
.par_iter()
.map(|(k, f)| -> RcfResult<Option<(K, AnomalyGrade)>> {
let grade = f.score_only(point)?;
if !grade.ready() {
return Ok(None);
}
Ok(Some(((*k).clone(), grade)))
})
.collect()
};
#[cfg(not(feature = "parallel"))]
let collected: RcfResult<Vec<Option<(K, AnomalyGrade)>>> = self
.forests
.iter()
.map(|(k, slot)| -> RcfResult<Option<(K, AnomalyGrade)>> {
let grade = slot.forest.score_only(point)?;
if !grade.ready() {
return Ok(None);
}
Ok(Some((k.clone(), grade)))
})
.collect();
let mut out: Vec<(K, AnomalyGrade)> = collected?.into_iter().flatten().collect();
out.sort_by(|a, b| {
b.1.grade()
.partial_cmp(&a.1.grade())
.unwrap_or(core::cmp::Ordering::Equal)
});
Ok(out)
}
#[must_use]
pub fn similarity_matrix(&self, min_observations: u64) -> Vec<(K, K, f64)>
where
K: Send + Sync,
{
let tenants: Vec<(&K, &ThresholdedForest<D>)> = self
.forests
.iter()
.filter_map(|(k, slot)| {
if slot.forest.stats().observations() >= min_observations {
Some((k, slot.forest.as_ref()))
} else {
None
}
})
.collect();
let n = tenants.len();
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let mut pairs: Vec<(usize, usize)> = Vec::with_capacity(n * n / 2);
for i in 0..n {
for j in (i + 1)..n {
pairs.push((i, j));
}
}
pairs
.par_iter()
.map(|&(i, j)| {
let (k_a, f_a) = tenants[i];
let (k_b, f_b) = tenants[j];
let dm = f_a.stats().mean() - f_b.stats().mean();
let ds = f_a.stats().stddev() - f_b.stats().stddev();
let dist = (dm * dm + ds * ds).sqrt();
(k_a.clone(), k_b.clone(), (-dist).exp())
})
.collect()
}
#[cfg(not(feature = "parallel"))]
{
let mut out = Vec::with_capacity(n * n / 2);
for (i, &(k_a, f_a)) in tenants.iter().enumerate() {
let mean_a = f_a.stats().mean();
let stddev_a = f_a.stats().stddev();
for &(k_b, f_b) in tenants.iter().skip(i + 1) {
let dm = mean_a - f_b.stats().mean();
let ds = stddev_a - f_b.stats().stddev();
let dist = (dm * dm + ds * ds).sqrt();
out.push((k_a.clone(), k_b.clone(), (-dist).exp()));
}
}
out
}
}
#[must_use]
pub fn most_similar(&self, key: &K, top_n: usize, min_observations: u64) -> Vec<(K, f64)> {
let Some(ref_slot) = self.forests.get(key) else {
return Vec::new();
};
let ref_stats = ref_slot.forest.stats();
if ref_stats.observations() < min_observations {
return Vec::new();
}
if top_n == 0 {
return Vec::new();
}
let mut heap: BinaryHeap<MostSimilarHeapEntry<K>> = BinaryHeap::with_capacity(top_n + 1);
for (k, slot) in &self.forests {
if k == key {
continue;
}
let stats = slot.forest.stats();
if stats.observations() < min_observations {
continue;
}
let dm = ref_stats.mean() - stats.mean();
let ds = ref_stats.stddev() - stats.stddev();
let dist = (dm * dm + ds * ds).sqrt();
let sim = (-dist).exp();
if heap.len() < top_n {
heap.push(MostSimilarHeapEntry {
sim,
key: k.clone(),
});
} else if let Some(min_entry) = heap.peek()
&& sim > min_entry.sim
{
heap.pop();
heap.push(MostSimilarHeapEntry {
sim,
key: k.clone(),
});
}
}
let mut out: Vec<(K, f64)> = heap.into_iter().map(|e| (e.key, e.sim)).collect();
out.sort_unstable_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal));
out
}
pub fn forensic_baseline(
&mut self,
key: &K,
point: &[f64; D],
) -> RcfResult<Option<crate::forensic::ForensicBaseline<D>>> {
match self.get_mut(key) {
Some(detector) => Ok(Some(detector.forensic_baseline(point)?)),
None => Ok(None),
}
}
pub fn attribution_many(&mut self, key: &K, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
self.touch_or_create(key)?.attribution_many(points)
}
pub fn process_at(
&mut self,
key: &K,
point: [f64; D],
timestamp: u64,
) -> RcfResult<AnomalyGrade> {
self.touch_or_create(key)?.process_at(point, timestamp)
}
pub fn delete_before(&mut self, key: &K, cutoff: u64) -> RcfResult<usize> {
match self.get_mut(key) {
Some(detector) => detector.delete_before(cutoff),
None => Ok(0),
}
}
pub fn score_early_term(
&mut self,
key: &K,
point: &[f64; D],
config: crate::early_term::EarlyTermConfig,
) -> RcfResult<crate::early_term::EarlyTermScore> {
self.touch_or_create(key)?.score_early_term(point, config)
}
pub fn delete(&mut self, key: &K, point_idx: usize) -> RcfResult<bool> {
match self.get_mut(key) {
Some(detector) => detector.delete(point_idx),
None => Ok(false),
}
}
pub fn delete_by_value(&mut self, key: &K, point: &[f64; D]) -> RcfResult<usize> {
match self.get_mut(key) {
Some(detector) => detector.delete_by_value(point),
None => Ok(0),
}
}
pub fn bootstrap<I>(&mut self, key: &K, points: I) -> RcfResult<BootstrapReport>
where
I: IntoIterator<Item = [f64; D]>,
{
self.touch_or_create(key)?.bootstrap(points)
}
pub fn insert(&mut self, key: K, forest: ThresholdedForest<D>) -> Option<ThresholdedForest<D>> {
let tick = self.bump_access();
let now = Instant::now();
if !self.forests.contains_key(&key) && self.forests.len() >= self.capacity {
self.evict_lru();
}
let previous = self
.forests
.insert(
key,
TenantSlot {
forest: Box::new(forest),
last_access: tick,
last_access_instant: now,
},
)
.map(|slot| *slot.forest);
self.emit_resident_gauge();
previous
}
pub fn remove(&mut self, key: &K) -> Option<ThresholdedForest<D>> {
let out = self.forests.remove(key).map(|slot| *slot.forest);
if out.is_some() {
self.emit_resident_gauge();
}
out
}
pub fn clear(&mut self) {
self.forests.clear();
self.emit_resident_gauge();
}
pub fn iter(&self) -> impl Iterator<Item = (&K, &ThresholdedForest<D>)> + '_ {
self.forests.iter().map(|(k, slot)| (k, &*slot.forest))
}
pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut ThresholdedForest<D>)> + '_ {
self.forests
.iter_mut()
.map(|(k, slot)| (k, slot.forest.as_mut()))
}
#[must_use]
pub fn tenants(&self) -> Vec<K> {
self.forests.keys().cloned().collect()
}
#[must_use]
pub fn readiness_summary(&self) -> ReadinessSummary {
let mut warming = 0_usize;
let mut ready = 0_usize;
for slot in self.forests.values() {
let cfg = slot.forest.thresholded_config();
if slot.forest.stats().observations() >= cfg.min_observations
&& slot.forest.stats().stddev() > 0.0
{
ready = ready.saturating_add(1);
} else {
warming = warming.saturating_add(1);
}
}
ReadinessSummary {
resident: self.forests.len(),
warming,
ready,
capacity: self.capacity,
tenants_created_lifetime: self.tenants_created_lifetime,
tenants_evicted_lifetime: self.tenants_evicted_lifetime,
}
}
pub fn evict_lru(&mut self) -> Option<(K, ThresholdedForest<D>)> {
let victim_key = self
.forests
.iter()
.min_by_key(|(_, slot)| slot.last_access)
.map(|(k, _)| k.clone())?;
let slot = self.forests.remove(&victim_key)?;
self.tenants_evicted_lifetime = self.tenants_evicted_lifetime.saturating_add(1);
self.metrics
.inc_counter(crate::metrics::names::TENANT_EVICTIONS_TOTAL, 1);
self.emit_resident_gauge();
Some((victim_key, *slot.forest))
}
pub fn evict_idle(&mut self, ttl: Duration) -> Vec<(K, ThresholdedForest<D>)> {
let now = Instant::now();
let victims: Vec<K> = self
.forests
.iter()
.filter_map(|(k, slot)| {
if now.saturating_duration_since(slot.last_access_instant) > ttl {
Some(k.clone())
} else {
None
}
})
.collect();
let mut evicted = Vec::with_capacity(victims.len());
for key in victims {
if let Some(slot) = self.forests.remove(&key) {
self.tenants_evicted_lifetime = self.tenants_evicted_lifetime.saturating_add(1);
self.metrics
.inc_counter(crate::metrics::names::TENANT_EVICTIONS_TOTAL, 1);
self.metrics
.inc_counter(crate::metrics::names::TENANT_IDLE_EVICTIONS_TOTAL, 1);
evicted.push((key, *slot.forest));
}
}
if !evicted.is_empty() {
self.emit_resident_gauge();
}
evicted
}
fn bump_access(&mut self) -> u64 {
self.access_counter = self.access_counter.saturating_add(1);
self.access_counter
}
fn touch_or_create(&mut self, key: &K) -> RcfResult<&mut ThresholdedForest<D>> {
let tick = self.bump_access();
let now = Instant::now();
if !self.forests.contains_key(key) {
if self.forests.len() >= self.capacity {
self.evict_lru();
}
let forest = (self.factory)()?;
self.forests.insert(
key.clone(),
TenantSlot {
forest: Box::new(forest),
last_access: tick,
last_access_instant: now,
},
);
self.tenants_created_lifetime = self.tenants_created_lifetime.saturating_add(1);
self.metrics
.inc_counter(crate::metrics::names::TENANT_CREATED_TOTAL, 1);
self.emit_resident_gauge();
}
let slot = self.forests.get_mut(key).expect("tenant was just inserted");
slot.last_access = tick;
slot.last_access_instant = now;
Ok(slot.forest.as_mut())
}
}
#[cfg(test)]
#[allow(clippy::float_cmp)] mod tests {
use super::*;
use crate::ThresholdedForestBuilder;
fn factory_2d() -> impl Fn() -> RcfResult<ThresholdedForest<2>> {
|| {
ThresholdedForestBuilder::<2>::new()
.num_trees(50)
.sample_size(16)
.min_observations(4)
.min_threshold(0.0)
.seed(42)
.build()
}
}
#[test]
fn new_rejects_zero_capacity() {
let err = TenantForestPool::<String, 2>::new(0, factory_2d()).unwrap_err();
assert!(matches!(err, RcfError::InvalidConfig(_)));
}
#[test]
fn new_accepts_capacity_one() {
let p = TenantForestPool::<String, 2>::new(1, factory_2d()).unwrap();
assert_eq!(p.capacity(), 1);
assert_eq!(p.len(), 0);
assert!(p.is_empty());
}
#[test]
fn process_auto_creates_tenant() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
assert!(!p.contains(&"a"));
p.process(&"a", [0.0, 0.0]).unwrap();
assert!(p.contains(&"a"));
assert_eq!(p.len(), 1);
}
#[test]
fn process_evicts_lru_when_full() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
p.process(&"a", [0.1, 0.1]).unwrap();
p.process(&"c", [2.0, 2.0]).unwrap();
assert!(p.contains(&"a"));
assert!(!p.contains(&"b"), "b should have been evicted");
assert!(p.contains(&"c"));
assert_eq!(p.len(), 2);
}
#[test]
fn peek_does_not_update_lru() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"old", [0.0, 0.0]).unwrap();
p.process(&"new", [1.0, 1.0]).unwrap();
let _ = p.peek(&"old");
p.process(&"newer", [2.0, 2.0]).unwrap();
assert!(!p.contains(&"old"), "peek should not refresh LRU");
}
#[test]
fn get_does_update_lru() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"old", [0.0, 0.0]).unwrap();
p.process(&"new", [1.0, 1.0]).unwrap();
let _ = p.get(&"old");
p.process(&"newer", [2.0, 2.0]).unwrap();
assert!(p.contains(&"old"), "get should refresh LRU");
assert!(!p.contains(&"new"), "new should be evicted instead");
}
#[test]
fn remove_returns_detector() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
let detector = p.remove(&"a").unwrap();
assert_eq!(detector.forest().num_trees(), 50);
assert!(!p.contains(&"a"));
}
#[test]
fn remove_returns_none_for_missing_tenant() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
assert!(p.remove(&"nope").is_none());
}
#[test]
fn insert_replaces_existing() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
let fresh = (factory_2d())().unwrap();
let old = p.insert("a", fresh).unwrap();
assert_eq!(old.forest().num_trees(), 50);
assert!(p.contains(&"a"));
assert_eq!(p.len(), 1);
}
#[test]
fn insert_evicts_when_full_and_key_new() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
let fresh = (factory_2d())().unwrap();
p.insert("c", fresh);
assert_eq!(p.len(), 2);
assert!(p.contains(&"c"));
}
#[test]
fn clear_drops_all_tenants() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
p.clear();
assert!(p.is_empty());
}
#[test]
fn iter_visits_every_tenant() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
let mut keys: Vec<&&str> = p.iter().map(|(k, _)| k).collect();
keys.sort();
assert_eq!(keys, vec![&"a", &"b"]);
}
#[test]
fn evict_lru_returns_oldest_tenant() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
p.process(&"a", [0.1, 0.1]).unwrap();
let (key, _) = p.evict_lru().unwrap();
assert_eq!(key, "b");
}
#[test]
fn evict_lru_on_empty_pool_returns_none() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
assert!(p.evict_lru().is_none());
}
#[test]
fn evict_idle_retains_fresh_and_evicts_stale() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
std::thread::sleep(std::time::Duration::from_millis(40));
p.process(&"a", [0.1, 0.1]).unwrap();
let evicted = p.evict_idle(std::time::Duration::from_millis(20));
let evicted_keys: Vec<&&str> = evicted.iter().map(|(k, _)| k).collect();
assert_eq!(evicted_keys, vec![&"b"]);
assert!(p.contains(&"a"));
assert!(!p.contains(&"b"));
}
#[test]
fn evict_idle_empty_pool_returns_empty() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
let evicted = p.evict_idle(std::time::Duration::from_secs(1));
assert!(evicted.is_empty());
}
#[test]
fn readiness_summary_empty_pool() {
let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
let s = p.readiness_summary();
assert_eq!(s.resident, 0);
assert_eq!(s.warming, 0);
assert_eq!(s.ready, 0);
assert_eq!(s.capacity, 4);
assert_eq!(s.tenants_created_lifetime, 0);
assert_eq!(s.tenants_evicted_lifetime, 0);
assert!(s.is_fully_ready()); assert!(!s.is_at_capacity());
assert!(s.readiness_ratio().is_nan());
}
#[test]
fn readiness_summary_counts_warming_and_ready() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
for i in 0_u32..16 {
let v = f64::from(i) * 0.01;
p.process(&"a", [v, v]).unwrap();
}
p.process(&"b", [0.0, 0.0]).unwrap();
p.process(&"b", [0.01, 0.01]).unwrap();
let s = p.readiness_summary();
assert_eq!(s.resident, 2);
assert_eq!(s.ready, 1);
assert_eq!(s.warming, 1);
assert!((s.readiness_ratio() - 0.5).abs() < 1.0e-9);
assert!(!s.is_fully_ready());
}
#[test]
fn readiness_summary_tracks_lifetime_counters() {
let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
p.process(&"c", [2.0, 2.0]).unwrap(); let s = p.readiness_summary();
assert_eq!(s.tenants_created_lifetime, 3);
assert_eq!(s.tenants_evicted_lifetime, 1);
assert_eq!(s.resident, 2);
assert!(s.is_at_capacity());
}
#[test]
fn evict_idle_zero_ttl_evicts_all_non_just_touched() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
std::thread::sleep(std::time::Duration::from_millis(5));
let evicted = p.evict_idle(std::time::Duration::from_millis(0));
assert_eq!(evicted.len(), 2);
assert!(p.is_empty());
}
#[test]
fn factory_error_propagates() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, || {
Err(RcfError::InvalidConfig("forced".into()))
})
.unwrap();
let err = p.process(&"x", [0.0, 0.0]).unwrap_err();
assert!(matches!(err, RcfError::InvalidConfig(_)));
assert!(p.is_empty(), "failed factory should not leave an entry");
}
#[test]
fn score_only_auto_creates_but_leaves_stats_empty() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
let verdict = p.score_only(&"a", &[0.0, 0.0]).unwrap();
assert!(!verdict.ready(), "brand-new detector should warming-up");
assert!(p.contains(&"a"));
}
#[test]
fn tenants_returns_live_keys() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
p.process(&"a", [0.0, 0.0]).unwrap();
p.process(&"b", [1.0, 1.0]).unwrap();
let mut ts = p.tenants();
ts.sort_unstable();
assert_eq!(ts, vec!["a", "b"]);
}
#[test]
fn score_across_tenants_ranks_desc_and_skips_cold() {
let mut p = TenantForestPool::<&'static str, 2>::new(8, factory_2d()).unwrap();
for i in 0_u32..16 {
let v = f64::from(i) * 0.01;
p.process(&"a", [v, v]).unwrap();
p.process(&"b", [v + 5.0, v + 5.0]).unwrap();
p.process(&"c", [v + 100.0, v + 100.0]).unwrap();
}
p.process(&"d", [0.5, 0.5]).unwrap();
let out = p.score_across_tenants(&[50.0, 50.0]).unwrap();
assert!(out.iter().all(|(k, _)| *k != "d"));
for [a, b] in out.array_windows::<2>() {
assert!(a.1.grade() >= b.1.grade());
}
}
#[test]
fn score_across_tenants_empty_pool_returns_empty() {
let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
let out = p.score_across_tenants(&[0.0, 0.0]).unwrap();
assert!(out.is_empty());
}
#[test]
fn similarity_matrix_empty_pool_returns_empty() {
let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
assert!(p.similarity_matrix(0).is_empty());
}
#[test]
fn similarity_matrix_skips_undertrained() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
for i in 0_u32..64 {
let v = f64::from(i) * 0.01;
p.process(&"a", [v, v]).unwrap();
}
for i in 0_u32..8 {
let v = f64::from(i) * 0.01;
p.process(&"b", [v, v]).unwrap();
}
let pairs = p.similarity_matrix(32);
assert!(pairs.is_empty(), "only A passed the min_obs threshold");
}
#[test]
fn most_similar_ranks_correctly() {
let mut p = TenantForestPool::<&'static str, 2>::new(8, factory_2d()).unwrap();
for i in 0_u32..64 {
let v = f64::from(i) * 0.01;
p.process(&"a", [v, v]).unwrap();
p.process(&"c", [v, v]).unwrap();
p.process(&"b", [v + 10.0, v + 10.0]).unwrap();
}
let ranked = p.most_similar(&"a", 2, 1);
assert_eq!(ranked.len(), 2);
let c_sim = ranked.iter().find(|(k, _)| *k == "c").unwrap().1;
let b_sim = ranked.iter().find(|(k, _)| *k == "b").unwrap().1;
assert!(
c_sim >= b_sim,
"c similarity {c_sim} should be >= b {b_sim}"
);
}
#[test]
fn most_similar_absent_key_returns_empty() {
let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
assert!(p.most_similar(&"unknown", 3, 0).is_empty());
}
#[test]
fn isolation_between_tenants() {
let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
for i in 0_u32..32 {
let v = f64::from(i) * 0.01;
p.process(&"a", [v, v]).unwrap();
p.process(&"b", [v, v]).unwrap();
}
for _ in 0..10 {
p.process(&"a", [100.0, 100.0]).unwrap();
}
let a_threshold = p.peek(&"a").unwrap().current_threshold();
let b_threshold = p.peek(&"b").unwrap().current_threshold();
assert!(
a_threshold > b_threshold,
"tenant A threshold {a_threshold} should be > tenant B threshold {b_threshold}",
);
}
}