1use core::cmp::Ordering;
33use core::hash::Hash;
34use std::collections::{BinaryHeap, HashMap};
35use std::time::{Duration, Instant};
36
37use crate::bootstrap::BootstrapReport;
38use crate::domain::DiVector;
39use crate::error::{RcfError, RcfResult};
40use crate::thresholded::{AnomalyGrade, ThresholdedForest};
41
42type ForestFactory<const D: usize> = dyn Fn() -> RcfResult<ThresholdedForest<D>>;
44
45#[derive(Debug)]
47struct TenantSlot<const D: usize> {
48 forest: Box<ThresholdedForest<D>>,
52 last_access: u64,
55 last_access_instant: Instant,
61}
62
63struct MostSimilarHeapEntry<K: Clone> {
70 sim: f64,
72 key: K,
74}
75
76impl<K: Clone> PartialEq for MostSimilarHeapEntry<K> {
77 fn eq(&self, other: &Self) -> bool {
78 self.sim == other.sim
79 }
80}
81
82impl<K: Clone> Eq for MostSimilarHeapEntry<K> {}
83
84impl<K: Clone> PartialOrd for MostSimilarHeapEntry<K> {
85 fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
86 Some(self.cmp(other))
87 }
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
107pub struct ReadinessSummary {
108 pub resident: usize,
110 pub warming: usize,
114 pub ready: usize,
116 pub capacity: usize,
118 pub tenants_created_lifetime: u64,
120 pub tenants_evicted_lifetime: u64,
122}
123
124impl ReadinessSummary {
125 #[must_use]
130 pub fn readiness_ratio(&self) -> f64 {
131 if self.resident == 0 {
132 #[allow(clippy::cast_precision_loss)]
133 return f64::NAN;
134 }
135 #[allow(clippy::cast_precision_loss)]
136 {
137 self.ready as f64 / self.resident as f64
138 }
139 }
140
141 #[must_use]
144 pub fn is_fully_ready(&self) -> bool {
145 self.warming == 0
146 }
147
148 #[must_use]
150 pub fn is_at_capacity(&self) -> bool {
151 self.resident >= self.capacity
152 }
153}
154
155impl<K: Clone> Ord for MostSimilarHeapEntry<K> {
156 fn cmp(&self, other: &Self) -> core::cmp::Ordering {
157 other
160 .sim
161 .partial_cmp(&self.sim)
162 .unwrap_or(core::cmp::Ordering::Equal)
163 }
164}
165
166pub struct TenantForestPool<K, const D: usize>
197where
198 K: Hash + Eq + Clone,
199{
200 forests: HashMap<K, TenantSlot<D>>,
202 capacity: usize,
204 access_counter: u64,
207 tenants_created_lifetime: u64,
211 tenants_evicted_lifetime: u64,
213 factory: Box<ForestFactory<D>>,
216 metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
220}
221
222impl<K, const D: usize> core::fmt::Debug for TenantForestPool<K, D>
223where
224 K: Hash + Eq + Clone + core::fmt::Debug,
225{
226 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
227 f.debug_struct("TenantForestPool")
230 .field("capacity", &self.capacity)
231 .field("len", &self.forests.len())
232 .field("access_counter", &self.access_counter)
233 .field("tenants_created_lifetime", &self.tenants_created_lifetime)
234 .field("tenants_evicted_lifetime", &self.tenants_evicted_lifetime)
235 .field("tenants", &self.forests.keys().collect::<Vec<_>>())
236 .field("factory", &"<dyn Fn>")
237 .field("metrics", &self.metrics)
238 .finish()
239 }
240}
241
242impl<K, const D: usize> TenantForestPool<K, D>
243where
244 K: Hash + Eq + Clone,
245{
246 pub fn new<F>(capacity: usize, factory: F) -> RcfResult<Self>
258 where
259 F: Fn() -> RcfResult<ThresholdedForest<D>> + 'static,
260 {
261 if capacity == 0 {
262 return Err(RcfError::InvalidConfig(
263 "TenantForestPool capacity must be > 0".into(),
264 ));
265 }
266 Ok(Self {
267 forests: HashMap::with_capacity(capacity),
268 capacity,
269 access_counter: 0,
270 tenants_created_lifetime: 0,
271 tenants_evicted_lifetime: 0,
272 factory: Box::new(factory),
273 metrics: crate::metrics::default_sink(),
274 })
275 }
276
277 #[must_use]
282 pub fn with_metrics_sink(
283 mut self,
284 sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
285 ) -> Self {
286 #[allow(clippy::cast_precision_loss)]
287 sink.set_gauge(
288 crate::metrics::names::TENANTS_RESIDENT,
289 self.forests.len() as f64,
290 );
291 #[allow(clippy::cast_precision_loss)]
292 sink.set_gauge(crate::metrics::names::TENANT_CAPACITY, self.capacity as f64);
293 self.metrics = sink;
294 self
295 }
296
297 #[must_use]
299 pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
300 &self.metrics
301 }
302
303 fn emit_resident_gauge(&self) {
306 #[allow(clippy::cast_precision_loss)]
307 self.metrics.set_gauge(
308 crate::metrics::names::TENANTS_RESIDENT,
309 self.forests.len() as f64,
310 );
311 }
312
313 #[must_use]
315 pub fn capacity(&self) -> usize {
316 self.capacity
317 }
318
319 #[must_use]
321 pub fn len(&self) -> usize {
322 self.forests.len()
323 }
324
325 #[must_use]
327 pub fn is_empty(&self) -> bool {
328 self.forests.is_empty()
329 }
330
331 #[must_use]
333 pub fn contains(&self, key: &K) -> bool {
334 self.forests.contains_key(key)
335 }
336
337 #[must_use]
342 pub fn peek(&self, key: &K) -> Option<&ThresholdedForest<D>> {
343 self.forests.get(key).map(|slot| slot.forest.as_ref())
344 }
345
346 pub fn get(&mut self, key: &K) -> Option<&ThresholdedForest<D>> {
349 let tick = self.bump_access();
350 let now = Instant::now();
351 self.forests.get_mut(key).map(|slot| {
352 slot.last_access = tick;
353 slot.last_access_instant = now;
354 &*slot.forest
355 })
356 }
357
358 pub fn get_mut(&mut self, key: &K) -> Option<&mut ThresholdedForest<D>> {
363 let tick = self.bump_access();
364 let now = Instant::now();
365 self.forests.get_mut(key).map(|slot| {
366 slot.last_access = tick;
367 slot.last_access_instant = now;
368 slot.forest.as_mut()
369 })
370 }
371
372 pub fn process(&mut self, key: &K, point: [f64; D]) -> RcfResult<AnomalyGrade> {
385 self.touch_or_create(key)?.process(point)
386 }
387
388 pub fn score_only(&mut self, key: &K, point: &[f64; D]) -> RcfResult<AnomalyGrade> {
399 self.touch_or_create(key)?.score_only(point)
400 }
401
402 pub fn attribution(&mut self, key: &K, point: &[f64; D]) -> RcfResult<DiVector> {
409 self.touch_or_create(key)?.attribution(point)
410 }
411
412 pub fn score_only_many(
421 &mut self,
422 key: &K,
423 points: &[[f64; D]],
424 ) -> RcfResult<Option<Vec<AnomalyGrade>>> {
425 match self.get_mut(key) {
426 Some(detector) => Ok(Some(detector.score_only_many(points)?)),
427 None => Ok(None),
428 }
429 }
430
431 pub fn score_many_early_term(
438 &mut self,
439 key: &K,
440 points: &[[f64; D]],
441 config: crate::early_term::EarlyTermConfig,
442 ) -> RcfResult<Vec<crate::early_term::EarlyTermScore>> {
443 self.touch_or_create(key)?
444 .score_many_early_term(points, config)
445 }
446
447 pub fn score_across_tenants(&self, point: &[f64; D]) -> RcfResult<Vec<(K, AnomalyGrade)>>
466 where
467 K: Send + Sync,
468 {
469 #[cfg(feature = "parallel")]
470 let collected: RcfResult<Vec<Option<(K, AnomalyGrade)>>> = {
471 use rayon::prelude::*;
472 let entries: Vec<(&K, &ThresholdedForest<D>)> = self
474 .forests
475 .iter()
476 .map(|(k, slot)| (k, slot.forest.as_ref()))
477 .collect();
478 entries
479 .par_iter()
480 .map(|(k, f)| -> RcfResult<Option<(K, AnomalyGrade)>> {
481 let grade = f.score_only(point)?;
482 if !grade.ready() {
483 return Ok(None);
484 }
485 Ok(Some(((*k).clone(), grade)))
486 })
487 .collect()
488 };
489 #[cfg(not(feature = "parallel"))]
490 let collected: RcfResult<Vec<Option<(K, AnomalyGrade)>>> = self
491 .forests
492 .iter()
493 .map(|(k, slot)| -> RcfResult<Option<(K, AnomalyGrade)>> {
494 let grade = slot.forest.score_only(point)?;
495 if !grade.ready() {
496 return Ok(None);
497 }
498 Ok(Some((k.clone(), grade)))
499 })
500 .collect();
501
502 let mut out: Vec<(K, AnomalyGrade)> = collected?.into_iter().flatten().collect();
503 out.sort_by(|a, b| {
504 b.1.grade()
505 .partial_cmp(&a.1.grade())
506 .unwrap_or(core::cmp::Ordering::Equal)
507 });
508 Ok(out)
509 }
510
511 #[must_use]
523 pub fn similarity_matrix(&self, min_observations: u64) -> Vec<(K, K, f64)>
524 where
525 K: Send + Sync,
526 {
527 let tenants: Vec<(&K, &ThresholdedForest<D>)> = self
528 .forests
529 .iter()
530 .filter_map(|(k, slot)| {
531 if slot.forest.stats().observations() >= min_observations {
532 Some((k, slot.forest.as_ref()))
533 } else {
534 None
535 }
536 })
537 .collect();
538 let n = tenants.len();
539
540 #[cfg(feature = "parallel")]
541 {
542 use rayon::prelude::*;
543 let mut pairs: Vec<(usize, usize)> = Vec::with_capacity(n * n / 2);
548 for i in 0..n {
549 for j in (i + 1)..n {
550 pairs.push((i, j));
551 }
552 }
553 pairs
554 .par_iter()
555 .map(|&(i, j)| {
556 let (k_a, f_a) = tenants[i];
557 let (k_b, f_b) = tenants[j];
558 let dm = f_a.stats().mean() - f_b.stats().mean();
559 let ds = f_a.stats().stddev() - f_b.stats().stddev();
560 let dist = (dm * dm + ds * ds).sqrt();
561 (k_a.clone(), k_b.clone(), (-dist).exp())
562 })
563 .collect()
564 }
565 #[cfg(not(feature = "parallel"))]
566 {
567 let mut out = Vec::with_capacity(n * n / 2);
568 for (i, &(k_a, f_a)) in tenants.iter().enumerate() {
569 let mean_a = f_a.stats().mean();
570 let stddev_a = f_a.stats().stddev();
571 for &(k_b, f_b) in tenants.iter().skip(i + 1) {
572 let dm = mean_a - f_b.stats().mean();
573 let ds = stddev_a - f_b.stats().stddev();
574 let dist = (dm * dm + ds * ds).sqrt();
575 out.push((k_a.clone(), k_b.clone(), (-dist).exp()));
576 }
577 }
578 out
579 }
580 }
581
582 #[must_use]
589 pub fn most_similar(&self, key: &K, top_n: usize, min_observations: u64) -> Vec<(K, f64)> {
590 let Some(ref_slot) = self.forests.get(key) else {
591 return Vec::new();
592 };
593 let ref_stats = ref_slot.forest.stats();
594 if ref_stats.observations() < min_observations {
595 return Vec::new();
596 }
597 if top_n == 0 {
598 return Vec::new();
599 }
600
601 let mut heap: BinaryHeap<MostSimilarHeapEntry<K>> = BinaryHeap::with_capacity(top_n + 1);
607 for (k, slot) in &self.forests {
608 if k == key {
609 continue;
610 }
611 let stats = slot.forest.stats();
612 if stats.observations() < min_observations {
613 continue;
614 }
615 let dm = ref_stats.mean() - stats.mean();
616 let ds = ref_stats.stddev() - stats.stddev();
617 let dist = (dm * dm + ds * ds).sqrt();
618 let sim = (-dist).exp();
619 if heap.len() < top_n {
620 heap.push(MostSimilarHeapEntry {
621 sim,
622 key: k.clone(),
623 });
624 } else if let Some(min_entry) = heap.peek()
625 && sim > min_entry.sim
626 {
627 heap.pop();
628 heap.push(MostSimilarHeapEntry {
629 sim,
630 key: k.clone(),
631 });
632 }
633 }
634 let mut out: Vec<(K, f64)> = heap.into_iter().map(|e| (e.key, e.sim)).collect();
636 out.sort_unstable_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal));
637 out
638 }
639
640 pub fn forensic_baseline(
648 &mut self,
649 key: &K,
650 point: &[f64; D],
651 ) -> RcfResult<Option<crate::forensic::ForensicBaseline<D>>> {
652 match self.get_mut(key) {
653 Some(detector) => Ok(Some(detector.forensic_baseline(point)?)),
654 None => Ok(None),
655 }
656 }
657
658 pub fn attribution_many(&mut self, key: &K, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
665 self.touch_or_create(key)?.attribution_many(points)
666 }
667
668 pub fn process_at(
676 &mut self,
677 key: &K,
678 point: [f64; D],
679 timestamp: u64,
680 ) -> RcfResult<AnomalyGrade> {
681 self.touch_or_create(key)?.process_at(point, timestamp)
682 }
683
684 pub fn delete_before(&mut self, key: &K, cutoff: u64) -> RcfResult<usize> {
693 match self.get_mut(key) {
694 Some(detector) => detector.delete_before(cutoff),
695 None => Ok(0),
696 }
697 }
698
699 pub fn score_early_term(
709 &mut self,
710 key: &K,
711 point: &[f64; D],
712 config: crate::early_term::EarlyTermConfig,
713 ) -> RcfResult<crate::early_term::EarlyTermScore> {
714 self.touch_or_create(key)?.score_early_term(point, config)
715 }
716
717 pub fn delete(&mut self, key: &K, point_idx: usize) -> RcfResult<bool> {
726 match self.get_mut(key) {
727 Some(detector) => detector.delete(point_idx),
728 None => Ok(false),
729 }
730 }
731
732 pub fn delete_by_value(&mut self, key: &K, point: &[f64; D]) -> RcfResult<usize> {
740 match self.get_mut(key) {
741 Some(detector) => detector.delete_by_value(point),
742 None => Ok(0),
743 }
744 }
745
746 pub fn bootstrap<I>(&mut self, key: &K, points: I) -> RcfResult<BootstrapReport>
764 where
765 I: IntoIterator<Item = [f64; D]>,
766 {
767 self.touch_or_create(key)?.bootstrap(points)
768 }
769
770 pub fn insert(&mut self, key: K, forest: ThresholdedForest<D>) -> Option<ThresholdedForest<D>> {
778 let tick = self.bump_access();
779 let now = Instant::now();
780 if !self.forests.contains_key(&key) && self.forests.len() >= self.capacity {
781 self.evict_lru();
782 }
783 let previous = self
784 .forests
785 .insert(
786 key,
787 TenantSlot {
788 forest: Box::new(forest),
789 last_access: tick,
790 last_access_instant: now,
791 },
792 )
793 .map(|slot| *slot.forest);
794 self.emit_resident_gauge();
795 previous
796 }
797
798 pub fn remove(&mut self, key: &K) -> Option<ThresholdedForest<D>> {
801 let out = self.forests.remove(key).map(|slot| *slot.forest);
802 if out.is_some() {
803 self.emit_resident_gauge();
804 }
805 out
806 }
807
808 pub fn clear(&mut self) {
810 self.forests.clear();
811 self.emit_resident_gauge();
812 }
813
814 pub fn iter(&self) -> impl Iterator<Item = (&K, &ThresholdedForest<D>)> + '_ {
817 self.forests.iter().map(|(k, slot)| (k, &*slot.forest))
818 }
819
820 pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut ThresholdedForest<D>)> + '_ {
824 self.forests
825 .iter_mut()
826 .map(|(k, slot)| (k, slot.forest.as_mut()))
827 }
828
829 #[must_use]
831 pub fn tenants(&self) -> Vec<K> {
832 self.forests.keys().cloned().collect()
833 }
834
835 #[must_use]
844 pub fn readiness_summary(&self) -> ReadinessSummary {
845 let mut warming = 0_usize;
846 let mut ready = 0_usize;
847 for slot in self.forests.values() {
848 let cfg = slot.forest.thresholded_config();
849 if slot.forest.stats().observations() >= cfg.min_observations
850 && slot.forest.stats().stddev() > 0.0
851 {
852 ready = ready.saturating_add(1);
853 } else {
854 warming = warming.saturating_add(1);
855 }
856 }
857 ReadinessSummary {
858 resident: self.forests.len(),
859 warming,
860 ready,
861 capacity: self.capacity,
862 tenants_created_lifetime: self.tenants_created_lifetime,
863 tenants_evicted_lifetime: self.tenants_evicted_lifetime,
864 }
865 }
866
867 pub fn evict_lru(&mut self) -> Option<(K, ThresholdedForest<D>)> {
875 let victim_key = self
876 .forests
877 .iter()
878 .min_by_key(|(_, slot)| slot.last_access)
879 .map(|(k, _)| k.clone())?;
880 let slot = self.forests.remove(&victim_key)?;
881 self.tenants_evicted_lifetime = self.tenants_evicted_lifetime.saturating_add(1);
882 self.metrics
883 .inc_counter(crate::metrics::names::TENANT_EVICTIONS_TOTAL, 1);
884 self.emit_resident_gauge();
885 Some((victim_key, *slot.forest))
886 }
887
888 pub fn evict_idle(&mut self, ttl: Duration) -> Vec<(K, ThresholdedForest<D>)> {
904 let now = Instant::now();
905 let victims: Vec<K> = self
909 .forests
910 .iter()
911 .filter_map(|(k, slot)| {
912 if now.saturating_duration_since(slot.last_access_instant) > ttl {
913 Some(k.clone())
914 } else {
915 None
916 }
917 })
918 .collect();
919 let mut evicted = Vec::with_capacity(victims.len());
920 for key in victims {
921 if let Some(slot) = self.forests.remove(&key) {
922 self.tenants_evicted_lifetime = self.tenants_evicted_lifetime.saturating_add(1);
923 self.metrics
924 .inc_counter(crate::metrics::names::TENANT_EVICTIONS_TOTAL, 1);
925 self.metrics
926 .inc_counter(crate::metrics::names::TENANT_IDLE_EVICTIONS_TOTAL, 1);
927 evicted.push((key, *slot.forest));
928 }
929 }
930 if !evicted.is_empty() {
931 self.emit_resident_gauge();
932 }
933 evicted
934 }
935
936 fn bump_access(&mut self) -> u64 {
938 self.access_counter = self.access_counter.saturating_add(1);
939 self.access_counter
940 }
941
942 fn touch_or_create(&mut self, key: &K) -> RcfResult<&mut ThresholdedForest<D>> {
946 let tick = self.bump_access();
947 let now = Instant::now();
948 if !self.forests.contains_key(key) {
949 if self.forests.len() >= self.capacity {
950 self.evict_lru();
951 }
952 let forest = (self.factory)()?;
953 self.forests.insert(
954 key.clone(),
955 TenantSlot {
956 forest: Box::new(forest),
957 last_access: tick,
958 last_access_instant: now,
959 },
960 );
961 self.tenants_created_lifetime = self.tenants_created_lifetime.saturating_add(1);
962 self.metrics
963 .inc_counter(crate::metrics::names::TENANT_CREATED_TOTAL, 1);
964 self.emit_resident_gauge();
965 }
966 let slot = self.forests.get_mut(key).expect("tenant was just inserted");
969 slot.last_access = tick;
970 slot.last_access_instant = now;
971 Ok(slot.forest.as_mut())
972 }
973}
974
975#[cfg(test)]
976#[allow(clippy::float_cmp)] mod tests {
978 use super::*;
979 use crate::ThresholdedForestBuilder;
980
981 fn factory_2d() -> impl Fn() -> RcfResult<ThresholdedForest<2>> {
982 || {
983 ThresholdedForestBuilder::<2>::new()
984 .num_trees(50)
985 .sample_size(16)
986 .min_observations(4)
987 .min_threshold(0.0)
988 .seed(42)
989 .build()
990 }
991 }
992
993 #[test]
994 fn new_rejects_zero_capacity() {
995 let err = TenantForestPool::<String, 2>::new(0, factory_2d()).unwrap_err();
996 assert!(matches!(err, RcfError::InvalidConfig(_)));
997 }
998
999 #[test]
1000 fn new_accepts_capacity_one() {
1001 let p = TenantForestPool::<String, 2>::new(1, factory_2d()).unwrap();
1002 assert_eq!(p.capacity(), 1);
1003 assert_eq!(p.len(), 0);
1004 assert!(p.is_empty());
1005 }
1006
1007 #[test]
1008 fn process_auto_creates_tenant() {
1009 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1010 assert!(!p.contains(&"a"));
1011 p.process(&"a", [0.0, 0.0]).unwrap();
1012 assert!(p.contains(&"a"));
1013 assert_eq!(p.len(), 1);
1014 }
1015
1016 #[test]
1017 fn process_evicts_lru_when_full() {
1018 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1019 p.process(&"a", [0.0, 0.0]).unwrap();
1020 p.process(&"b", [1.0, 1.0]).unwrap();
1021 p.process(&"a", [0.1, 0.1]).unwrap();
1023 p.process(&"c", [2.0, 2.0]).unwrap();
1024 assert!(p.contains(&"a"));
1025 assert!(!p.contains(&"b"), "b should have been evicted");
1026 assert!(p.contains(&"c"));
1027 assert_eq!(p.len(), 2);
1028 }
1029
1030 #[test]
1031 fn peek_does_not_update_lru() {
1032 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1033 p.process(&"old", [0.0, 0.0]).unwrap();
1034 p.process(&"new", [1.0, 1.0]).unwrap();
1035 let _ = p.peek(&"old");
1037 p.process(&"newer", [2.0, 2.0]).unwrap();
1038 assert!(!p.contains(&"old"), "peek should not refresh LRU");
1039 }
1040
1041 #[test]
1042 fn get_does_update_lru() {
1043 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1044 p.process(&"old", [0.0, 0.0]).unwrap();
1045 p.process(&"new", [1.0, 1.0]).unwrap();
1046 let _ = p.get(&"old");
1047 p.process(&"newer", [2.0, 2.0]).unwrap();
1048 assert!(p.contains(&"old"), "get should refresh LRU");
1049 assert!(!p.contains(&"new"), "new should be evicted instead");
1050 }
1051
1052 #[test]
1053 fn remove_returns_detector() {
1054 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1055 p.process(&"a", [0.0, 0.0]).unwrap();
1056 let detector = p.remove(&"a").unwrap();
1057 assert_eq!(detector.forest().num_trees(), 50);
1058 assert!(!p.contains(&"a"));
1059 }
1060
1061 #[test]
1062 fn remove_returns_none_for_missing_tenant() {
1063 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1064 assert!(p.remove(&"nope").is_none());
1065 }
1066
1067 #[test]
1068 fn insert_replaces_existing() {
1069 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1070 p.process(&"a", [0.0, 0.0]).unwrap();
1071 let fresh = (factory_2d())().unwrap();
1072 let old = p.insert("a", fresh).unwrap();
1073 assert_eq!(old.forest().num_trees(), 50);
1074 assert!(p.contains(&"a"));
1075 assert_eq!(p.len(), 1);
1076 }
1077
1078 #[test]
1079 fn insert_evicts_when_full_and_key_new() {
1080 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1081 p.process(&"a", [0.0, 0.0]).unwrap();
1082 p.process(&"b", [1.0, 1.0]).unwrap();
1083 let fresh = (factory_2d())().unwrap();
1084 p.insert("c", fresh);
1085 assert_eq!(p.len(), 2);
1086 assert!(p.contains(&"c"));
1087 }
1088
1089 #[test]
1090 fn clear_drops_all_tenants() {
1091 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1092 p.process(&"a", [0.0, 0.0]).unwrap();
1093 p.process(&"b", [1.0, 1.0]).unwrap();
1094 p.clear();
1095 assert!(p.is_empty());
1096 }
1097
1098 #[test]
1099 fn iter_visits_every_tenant() {
1100 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1101 p.process(&"a", [0.0, 0.0]).unwrap();
1102 p.process(&"b", [1.0, 1.0]).unwrap();
1103 let mut keys: Vec<&&str> = p.iter().map(|(k, _)| k).collect();
1104 keys.sort();
1105 assert_eq!(keys, vec![&"a", &"b"]);
1106 }
1107
1108 #[test]
1109 fn evict_lru_returns_oldest_tenant() {
1110 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1111 p.process(&"a", [0.0, 0.0]).unwrap();
1112 p.process(&"b", [1.0, 1.0]).unwrap();
1113 p.process(&"a", [0.1, 0.1]).unwrap();
1114 let (key, _) = p.evict_lru().unwrap();
1115 assert_eq!(key, "b");
1116 }
1117
1118 #[test]
1119 fn evict_lru_on_empty_pool_returns_none() {
1120 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1121 assert!(p.evict_lru().is_none());
1122 }
1123
1124 #[test]
1125 fn evict_idle_retains_fresh_and_evicts_stale() {
1126 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1127 p.process(&"a", [0.0, 0.0]).unwrap();
1128 p.process(&"b", [1.0, 1.0]).unwrap();
1129 std::thread::sleep(std::time::Duration::from_millis(40));
1132 p.process(&"a", [0.1, 0.1]).unwrap();
1133 let evicted = p.evict_idle(std::time::Duration::from_millis(20));
1134 let evicted_keys: Vec<&&str> = evicted.iter().map(|(k, _)| k).collect();
1135 assert_eq!(evicted_keys, vec![&"b"]);
1136 assert!(p.contains(&"a"));
1137 assert!(!p.contains(&"b"));
1138 }
1139
1140 #[test]
1141 fn evict_idle_empty_pool_returns_empty() {
1142 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1143 let evicted = p.evict_idle(std::time::Duration::from_secs(1));
1144 assert!(evicted.is_empty());
1145 }
1146
1147 #[test]
1148 fn readiness_summary_empty_pool() {
1149 let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1150 let s = p.readiness_summary();
1151 assert_eq!(s.resident, 0);
1152 assert_eq!(s.warming, 0);
1153 assert_eq!(s.ready, 0);
1154 assert_eq!(s.capacity, 4);
1155 assert_eq!(s.tenants_created_lifetime, 0);
1156 assert_eq!(s.tenants_evicted_lifetime, 0);
1157 assert!(s.is_fully_ready()); assert!(!s.is_at_capacity());
1159 assert!(s.readiness_ratio().is_nan());
1160 }
1161
1162 #[test]
1163 fn readiness_summary_counts_warming_and_ready() {
1164 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1165 for i in 0_u32..16 {
1167 let v = f64::from(i) * 0.01;
1168 p.process(&"a", [v, v]).unwrap();
1169 }
1170 p.process(&"b", [0.0, 0.0]).unwrap();
1172 p.process(&"b", [0.01, 0.01]).unwrap();
1173 let s = p.readiness_summary();
1174 assert_eq!(s.resident, 2);
1175 assert_eq!(s.ready, 1);
1176 assert_eq!(s.warming, 1);
1177 assert!((s.readiness_ratio() - 0.5).abs() < 1.0e-9);
1178 assert!(!s.is_fully_ready());
1179 }
1180
1181 #[test]
1182 fn readiness_summary_tracks_lifetime_counters() {
1183 let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1184 p.process(&"a", [0.0, 0.0]).unwrap();
1185 p.process(&"b", [1.0, 1.0]).unwrap();
1186 p.process(&"c", [2.0, 2.0]).unwrap(); let s = p.readiness_summary();
1188 assert_eq!(s.tenants_created_lifetime, 3);
1189 assert_eq!(s.tenants_evicted_lifetime, 1);
1190 assert_eq!(s.resident, 2);
1191 assert!(s.is_at_capacity());
1192 }
1193
1194 #[test]
1195 fn evict_idle_zero_ttl_evicts_all_non_just_touched() {
1196 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1197 p.process(&"a", [0.0, 0.0]).unwrap();
1198 p.process(&"b", [1.0, 1.0]).unwrap();
1199 std::thread::sleep(std::time::Duration::from_millis(5));
1201 let evicted = p.evict_idle(std::time::Duration::from_millis(0));
1202 assert_eq!(evicted.len(), 2);
1203 assert!(p.is_empty());
1204 }
1205
1206 #[test]
1207 fn factory_error_propagates() {
1208 let mut p = TenantForestPool::<&'static str, 2>::new(4, || {
1209 Err(RcfError::InvalidConfig("forced".into()))
1210 })
1211 .unwrap();
1212 let err = p.process(&"x", [0.0, 0.0]).unwrap_err();
1213 assert!(matches!(err, RcfError::InvalidConfig(_)));
1214 assert!(p.is_empty(), "failed factory should not leave an entry");
1215 }
1216
1217 #[test]
1218 fn score_only_auto_creates_but_leaves_stats_empty() {
1219 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1220 let verdict = p.score_only(&"a", &[0.0, 0.0]).unwrap();
1221 assert!(!verdict.ready(), "brand-new detector should warming-up");
1222 assert!(p.contains(&"a"));
1223 }
1224
1225 #[test]
1226 fn tenants_returns_live_keys() {
1227 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1228 p.process(&"a", [0.0, 0.0]).unwrap();
1229 p.process(&"b", [1.0, 1.0]).unwrap();
1230 let mut ts = p.tenants();
1231 ts.sort_unstable();
1232 assert_eq!(ts, vec!["a", "b"]);
1233 }
1234
1235 #[test]
1236 fn score_across_tenants_ranks_desc_and_skips_cold() {
1237 let mut p = TenantForestPool::<&'static str, 2>::new(8, factory_2d()).unwrap();
1238 for i in 0_u32..16 {
1240 let v = f64::from(i) * 0.01;
1241 p.process(&"a", [v, v]).unwrap();
1242 p.process(&"b", [v + 5.0, v + 5.0]).unwrap();
1243 p.process(&"c", [v + 100.0, v + 100.0]).unwrap();
1244 }
1245 p.process(&"d", [0.5, 0.5]).unwrap();
1247
1248 let out = p.score_across_tenants(&[50.0, 50.0]).unwrap();
1249 assert!(out.iter().all(|(k, _)| *k != "d"));
1251 for [a, b] in out.array_windows::<2>() {
1253 assert!(a.1.grade() >= b.1.grade());
1254 }
1255 }
1256
1257 #[test]
1258 fn score_across_tenants_empty_pool_returns_empty() {
1259 let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1260 let out = p.score_across_tenants(&[0.0, 0.0]).unwrap();
1261 assert!(out.is_empty());
1262 }
1263
1264 #[test]
1265 fn similarity_matrix_empty_pool_returns_empty() {
1266 let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1267 assert!(p.similarity_matrix(0).is_empty());
1268 }
1269
1270 #[test]
1271 fn similarity_matrix_skips_undertrained() {
1272 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1273 for i in 0_u32..64 {
1275 let v = f64::from(i) * 0.01;
1276 p.process(&"a", [v, v]).unwrap();
1277 }
1278 for i in 0_u32..8 {
1281 let v = f64::from(i) * 0.01;
1282 p.process(&"b", [v, v]).unwrap();
1283 }
1284 let pairs = p.similarity_matrix(32);
1285 assert!(pairs.is_empty(), "only A passed the min_obs threshold");
1286 }
1287
1288 #[test]
1289 fn most_similar_ranks_correctly() {
1290 let mut p = TenantForestPool::<&'static str, 2>::new(8, factory_2d()).unwrap();
1291 for i in 0_u32..64 {
1293 let v = f64::from(i) * 0.01;
1294 p.process(&"a", [v, v]).unwrap();
1295 p.process(&"c", [v, v]).unwrap();
1296 p.process(&"b", [v + 10.0, v + 10.0]).unwrap();
1297 }
1298 let ranked = p.most_similar(&"a", 2, 1);
1299 assert_eq!(ranked.len(), 2);
1300 let c_sim = ranked.iter().find(|(k, _)| *k == "c").unwrap().1;
1302 let b_sim = ranked.iter().find(|(k, _)| *k == "b").unwrap().1;
1303 assert!(
1304 c_sim >= b_sim,
1305 "c similarity {c_sim} should be >= b {b_sim}"
1306 );
1307 }
1308
1309 #[test]
1310 fn most_similar_absent_key_returns_empty() {
1311 let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1312 assert!(p.most_similar(&"unknown", 3, 0).is_empty());
1313 }
1314
1315 #[test]
1316 fn isolation_between_tenants() {
1317 let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1320 for i in 0_u32..32 {
1321 let v = f64::from(i) * 0.01;
1322 p.process(&"a", [v, v]).unwrap();
1323 p.process(&"b", [v, v]).unwrap();
1324 }
1325 for _ in 0..10 {
1327 p.process(&"a", [100.0, 100.0]).unwrap();
1328 }
1329 let a_threshold = p.peek(&"a").unwrap().current_threshold();
1330 let b_threshold = p.peek(&"b").unwrap().current_threshold();
1331 assert!(
1332 a_threshold > b_threshold,
1333 "tenant A threshold {a_threshold} should be > tenant B threshold {b_threshold}",
1334 );
1335 }
1336}