Skip to main content

anomstream_core/pool/
tenant_pool.rs

1//! Per-tenant pool of [`ThresholdedForest`] detectors.
2//!
3//! A single forest shared across every tenant pollutes baselines —
4//! tenant A's quiet traffic pushes tenant B's threshold down, and
5//! vice versa. [`TenantForestPool`] keeps one detector per tenant
6//! key, instantiated on demand, with an LRU eviction policy so the
7//! pool's footprint stays bounded even when the caller sees a long
8//! tail of one-off tenants.
9//!
10//! # Life cycle
11//!
12//! - **Construction**: [`TenantForestPool::new`] takes a `capacity`
13//!   (maximum simultaneous tenants) and a *factory* closure that
14//!   knows how to build a fresh detector. The factory is invoked
15//!   lazily — a tenant that never sends a point never allocates its
16//!   detector.
17//! - **Processing**: [`TenantForestPool::process`] looks up the
18//!   tenant, creating the detector with the factory if absent,
19//!   evicting the least-recently-used tenant when insertion would
20//!   push the pool past `capacity`.
21//! - **Inspection / migration**: [`TenantForestPool::iter`] and
22//!   [`TenantForestPool::iter_mut`] walk every live tenant; combine
23//!   with [`ThresholdedForest::to_path`] to checkpoint the whole
24//!   pool to disk.
25//!
26//! # Thread safety
27//!
28//! The pool is `!Sync` and mutation takes `&mut self`. Wrap in a
29//! [`std::sync::Mutex`] (or a sharded `RwLock` for read-heavy paths)
30//! if multiple threads must share the same pool.
31
32use core::cmp::Ordering;
33use core::hash::Hash;
34use std::collections::{BinaryHeap, HashMap};
35use std::time::{Duration, Instant};
36
37use crate::bootstrap::BootstrapReport;
38use crate::domain::DiVector;
39use crate::error::{RcfError, RcfResult};
40use crate::thresholded::{AnomalyGrade, ThresholdedForest};
41
42/// Factory closure type carried by the pool.
43type ForestFactory<const D: usize> = dyn Fn() -> RcfResult<ThresholdedForest<D>>;
44
45/// Stored per-tenant entry: the detector + its LRU timestamp.
46#[derive(Debug)]
47struct TenantSlot<const D: usize> {
48    /// Detector owned by this tenant. `Box` keeps the hashmap entry
49    /// small (one pointer) so rehashes on eviction are cheap even
50    /// with large `D`.
51    forest: Box<ThresholdedForest<D>>,
52    /// Monotonically increasing tick assigned by the pool on every
53    /// access. The minimum tick identifies the LRU victim.
54    last_access: u64,
55    /// Wall-clock timestamp of the last access — drives the
56    /// [`TenantForestPool::evict_idle`] TTL path. Independent from
57    /// the `last_access` tick so monotonic-counter LRU and
58    /// wall-clock TTL stay orthogonal (the tick can wrap under
59    /// pathological churn but the `Instant` stays truthful).
60    last_access_instant: Instant,
61}
62
63/// Bounded-heap entry used by [`TenantForestPool::most_similar`].
64/// Ordered only by similarity — `K` does not participate so the
65/// heap works without requiring `K: Ord`. Ordering is inverted so
66/// a `BinaryHeap` (max-heap by default) behaves as a min-heap on
67/// similarity: `peek` returns the entry to evict when a better
68/// candidate arrives.
69struct MostSimilarHeapEntry<K: Clone> {
70    /// Similarity score in `(0, 1]`.
71    sim: f64,
72    /// Tenant key owning this entry.
73    key: K,
74}
75
76impl<K: Clone> PartialEq for MostSimilarHeapEntry<K> {
77    fn eq(&self, other: &Self) -> bool {
78        self.sim == other.sim
79    }
80}
81
82impl<K: Clone> Eq for MostSimilarHeapEntry<K> {}
83
84impl<K: Clone> PartialOrd for MostSimilarHeapEntry<K> {
85    fn partial_cmp(&self, other: &Self) -> Option<core::cmp::Ordering> {
86        Some(self.cmp(other))
87    }
88}
89
90/// Aggregate readiness snapshot of a [`TenantForestPool`].
91///
92/// Surface for health-check / readiness-probe endpoints:
93///
94/// - `resident` = tenants currently in the pool
95/// - `warming` = resident tenants whose TRCF has not yet reached
96///   `min_observations` (`grade.ready()` would return `false`)
97/// - `ready` = resident tenants whose TRCF *is* warm
98///   (`grade.ready()` true)
99/// - `capacity` = configured capacity ceiling
100/// - `tenants_created_lifetime` / `tenants_evicted_lifetime` —
101///   lifetime counters tracked internally so a liveness endpoint
102///   doesn't need to plumb a [`crate::MetricsSink`].
103///
104/// `warming + ready = resident` invariant always holds.
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
106#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
107pub struct ReadinessSummary {
108    /// Tenants currently held by the pool.
109    pub resident: usize,
110    /// Subset of `resident` that has not yet satisfied
111    /// `min_observations`. These tenants' `process` calls return
112    /// warming-up verdicts (`ready = false`).
113    pub warming: usize,
114    /// Subset of `resident` that *has* satisfied `min_observations`.
115    pub ready: usize,
116    /// Configured capacity ceiling of the pool.
117    pub capacity: usize,
118    /// Lifetime count of pool-factory invocations (fresh tenants).
119    pub tenants_created_lifetime: u64,
120    /// Lifetime count of evictions (LRU + TTL combined).
121    pub tenants_evicted_lifetime: u64,
122}
123
124impl ReadinessSummary {
125    /// Fraction of resident tenants that are warm (`ready / resident`)
126    /// — `1.0` when every tenant is ready, `0.0` when all are
127    /// warming, `NaN` on an empty pool (caller decides UX: report
128    /// `1.0` as "vacuously healthy" or surface the NaN).
129    #[must_use]
130    pub fn readiness_ratio(&self) -> f64 {
131        if self.resident == 0 {
132            #[allow(clippy::cast_precision_loss)]
133            return f64::NAN;
134        }
135        #[allow(clippy::cast_precision_loss)]
136        {
137            self.ready as f64 / self.resident as f64
138        }
139    }
140
141    /// Whether **every** resident tenant is ready. Vacuously `true`
142    /// on an empty pool — an empty pool has zero warming tenants.
143    #[must_use]
144    pub fn is_fully_ready(&self) -> bool {
145        self.warming == 0
146    }
147
148    /// Whether the pool has hit its capacity ceiling.
149    #[must_use]
150    pub fn is_at_capacity(&self) -> bool {
151        self.resident >= self.capacity
152    }
153}
154
155impl<K: Clone> Ord for MostSimilarHeapEntry<K> {
156    fn cmp(&self, other: &Self) -> core::cmp::Ordering {
157        // Inverted: larger `sim` → smaller cmp → sinks in the
158        // default max-heap → `peek` returns smallest sim.
159        other
160            .sim
161            .partial_cmp(&self.sim)
162            .unwrap_or(core::cmp::Ordering::Equal)
163    }
164}
165
166/// Per-tenant pool of [`ThresholdedForest`] detectors.
167///
168/// `K` is the tenant key type. Typical choices: `String`,
169/// `u64`, a small `enum`, or a newtype wrapping a UUID.
170///
171/// `D` is the per-point dimensionality, identical across every
172/// tenant in the pool — mixed-dimension pools are not supported
173/// because each tenant's `const D: usize` is baked into the forest's
174/// type.
175///
176/// # Examples
177///
178/// ```
179/// use anomstream_core::{TenantForestPool, ThresholdedForestBuilder};
180///
181/// let mut pool: TenantForestPool<String, 2> = TenantForestPool::new(
182///     4,
183///     || ThresholdedForestBuilder::<2>::new()
184///         .num_trees(50)
185///         .sample_size(16)
186///         .min_observations(4)
187///         .seed(42)
188///         .build(),
189/// ).unwrap();
190///
191/// let verdict_a = pool.process(&"tenant-a".to_string(), [0.1, 0.2]).unwrap();
192/// let verdict_b = pool.process(&"tenant-b".to_string(), [5.0, 5.0]).unwrap();
193/// assert_eq!(pool.len(), 2);
194/// # let _ = (verdict_a, verdict_b);
195/// ```
196pub struct TenantForestPool<K, const D: usize>
197where
198    K: Hash + Eq + Clone,
199{
200    /// Live per-tenant detectors keyed by tenant id.
201    forests: HashMap<K, TenantSlot<D>>,
202    /// Maximum number of tenants the pool holds at once.
203    capacity: usize,
204    /// Monotonic access counter — bumped on every `process`,
205    /// `get`, `get_mut`, or `score_only` call.
206    access_counter: u64,
207    /// Lifetime total of pool-factory invocations (fresh tenants).
208    /// Surfaced by [`TenantForestPool::readiness_summary`] so
209    /// health-check endpoints don't need to plumb a metrics sink.
210    tenants_created_lifetime: u64,
211    /// Lifetime total of evictions (LRU + TTL, both paths).
212    tenants_evicted_lifetime: u64,
213    /// Factory used to build a detector when a tenant is seen for
214    /// the first time (or after its entry has been evicted).
215    factory: Box<ForestFactory<D>>,
216    /// Observability sink for pool-level events (LRU evictions,
217    /// resident-count gauge). Per-tenant detectors have their own
218    /// sinks that the caller can install inside the factory closure.
219    metrics: std::sync::Arc<dyn crate::metrics::MetricsSink>,
220}
221
222impl<K, const D: usize> core::fmt::Debug for TenantForestPool<K, D>
223where
224    K: Hash + Eq + Clone + core::fmt::Debug,
225{
226    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
227        // `factory` is a trait object without a Debug bound — emit
228        // its type-erased marker instead of the struct field.
229        f.debug_struct("TenantForestPool")
230            .field("capacity", &self.capacity)
231            .field("len", &self.forests.len())
232            .field("access_counter", &self.access_counter)
233            .field("tenants_created_lifetime", &self.tenants_created_lifetime)
234            .field("tenants_evicted_lifetime", &self.tenants_evicted_lifetime)
235            .field("tenants", &self.forests.keys().collect::<Vec<_>>())
236            .field("factory", &"<dyn Fn>")
237            .field("metrics", &self.metrics)
238            .finish()
239    }
240}
241
242impl<K, const D: usize> TenantForestPool<K, D>
243where
244    K: Hash + Eq + Clone,
245{
246    /// Build a pool bounded at `capacity` tenants.
247    ///
248    /// The factory is stored and invoked on every first-seen tenant —
249    /// it must be able to build a detector repeatedly, not just once.
250    /// Seed the factory's builder deterministically (or from a
251    /// per-tenant seed inside the closure) when reproducibility
252    /// matters.
253    ///
254    /// # Errors
255    ///
256    /// Returns [`RcfError::InvalidConfig`] when `capacity == 0`.
257    pub fn new<F>(capacity: usize, factory: F) -> RcfResult<Self>
258    where
259        F: Fn() -> RcfResult<ThresholdedForest<D>> + 'static,
260    {
261        if capacity == 0 {
262            return Err(RcfError::InvalidConfig(
263                "TenantForestPool capacity must be > 0".into(),
264            ));
265        }
266        Ok(Self {
267            forests: HashMap::with_capacity(capacity),
268            capacity,
269            access_counter: 0,
270            tenants_created_lifetime: 0,
271            tenants_evicted_lifetime: 0,
272            factory: Box::new(factory),
273            metrics: crate::metrics::default_sink(),
274        })
275    }
276
277    /// Install a [`crate::MetricsSink`] for pool-level events.
278    /// Emits `rcf_tenants_resident` gauge updates on every public
279    /// mutation and `rcf_tenant_evictions_total` on LRU evictions.
280    /// Per-tenant detector metrics are the factory's responsibility.
281    #[must_use]
282    pub fn with_metrics_sink(
283        mut self,
284        sink: std::sync::Arc<dyn crate::metrics::MetricsSink>,
285    ) -> Self {
286        #[allow(clippy::cast_precision_loss)]
287        sink.set_gauge(
288            crate::metrics::names::TENANTS_RESIDENT,
289            self.forests.len() as f64,
290        );
291        #[allow(clippy::cast_precision_loss)]
292        sink.set_gauge(crate::metrics::names::TENANT_CAPACITY, self.capacity as f64);
293        self.metrics = sink;
294        self
295    }
296
297    /// Read-only handle to the installed pool-level sink.
298    #[must_use]
299    pub fn metrics_sink(&self) -> &std::sync::Arc<dyn crate::metrics::MetricsSink> {
300        &self.metrics
301    }
302
303    /// Refresh the `rcf_tenants_resident` gauge — called internally
304    /// after every op that mutates the resident set.
305    fn emit_resident_gauge(&self) {
306        #[allow(clippy::cast_precision_loss)]
307        self.metrics.set_gauge(
308            crate::metrics::names::TENANTS_RESIDENT,
309            self.forests.len() as f64,
310        );
311    }
312
313    /// Maximum number of tenants the pool holds simultaneously.
314    #[must_use]
315    pub fn capacity(&self) -> usize {
316        self.capacity
317    }
318
319    /// Current number of live tenants.
320    #[must_use]
321    pub fn len(&self) -> usize {
322        self.forests.len()
323    }
324
325    /// Whether the pool is empty.
326    #[must_use]
327    pub fn is_empty(&self) -> bool {
328        self.forests.is_empty()
329    }
330
331    /// Whether the tenant currently has a detector in the pool.
332    #[must_use]
333    pub fn contains(&self, key: &K) -> bool {
334        self.forests.contains_key(key)
335    }
336
337    /// Read-only handle to a tenant's detector. Returns `None` when
338    /// the tenant has never processed a point or has been evicted.
339    /// Does **not** bump the LRU access counter so diagnostic tools
340    /// can inspect state without disturbing eviction order.
341    #[must_use]
342    pub fn peek(&self, key: &K) -> Option<&ThresholdedForest<D>> {
343        self.forests.get(key).map(|slot| slot.forest.as_ref())
344    }
345
346    /// Read-only handle with an LRU touch — the tenant is treated as
347    /// freshly accessed.
348    pub fn get(&mut self, key: &K) -> Option<&ThresholdedForest<D>> {
349        let tick = self.bump_access();
350        let now = Instant::now();
351        self.forests.get_mut(key).map(|slot| {
352            slot.last_access = tick;
353            slot.last_access_instant = now;
354            &*slot.forest
355        })
356    }
357
358    /// Mutable handle with an LRU touch. Prefer
359    /// [`Self::process`] / [`Self::score_only`] unless you need
360    /// direct access to [`ThresholdedForest`] methods not exposed by
361    /// the pool.
362    pub fn get_mut(&mut self, key: &K) -> Option<&mut ThresholdedForest<D>> {
363        let tick = self.bump_access();
364        let now = Instant::now();
365        self.forests.get_mut(key).map(|slot| {
366            slot.last_access = tick;
367            slot.last_access_instant = now;
368            slot.forest.as_mut()
369        })
370    }
371
372    /// Score a point through the tenant's detector and graduate it
373    /// through the adaptive threshold, creating the detector with
374    /// the factory if this is the first point for the tenant.
375    ///
376    /// If inserting would push the pool past `capacity` the
377    /// least-recently-used tenant is evicted first.
378    ///
379    /// # Errors
380    ///
381    /// Propagates factory errors (when a new detector must be built)
382    /// and [`ThresholdedForest::process`] errors (malformed point,
383    /// etc.).
384    pub fn process(&mut self, key: &K, point: [f64; D]) -> RcfResult<AnomalyGrade> {
385        self.touch_or_create(key)?.process(point)
386    }
387
388    /// Score a point against the tenant's detector without mutating
389    /// the underlying forest or its statistics. Creates the detector
390    /// on first use just like [`Self::process`] so the very first
391    /// call for a tenant is not surprising — the detector exists
392    /// but returns a warming-up verdict (no observations yet).
393    ///
394    /// # Errors
395    ///
396    /// Propagates factory errors and [`ThresholdedForest::score_only`]
397    /// errors.
398    pub fn score_only(&mut self, key: &K, point: &[f64; D]) -> RcfResult<AnomalyGrade> {
399        self.touch_or_create(key)?.score_only(point)
400    }
401
402    /// Per-feature attribution for a tenant's view of a point.
403    ///
404    /// # Errors
405    ///
406    /// Propagates factory errors and
407    /// [`ThresholdedForest::attribution`] errors.
408    pub fn attribution(&mut self, key: &K, point: &[f64; D]) -> RcfResult<DiVector> {
409        self.touch_or_create(key)?.attribution(point)
410    }
411
412    /// Bulk-score a batch of points through the tenant's detector
413    /// without creating the tenant on absence — retention-aware
414    /// read path. Returns `None` when the tenant is absent, or the
415    /// batch of graded verdicts otherwise.
416    ///
417    /// # Errors
418    ///
419    /// Propagates [`ThresholdedForest::score_only_many`] errors.
420    pub fn score_only_many(
421        &mut self,
422        key: &K,
423        points: &[[f64; D]],
424    ) -> RcfResult<Option<Vec<AnomalyGrade>>> {
425        match self.get_mut(key) {
426            Some(detector) => Ok(Some(detector.score_only_many(points)?)),
427            None => Ok(None),
428        }
429    }
430
431    /// Bulk early-termination scoring on a tenant's detector.
432    /// Auto-creates the tenant (consistent with `process`).
433    ///
434    /// # Errors
435    ///
436    /// Propagates [`ThresholdedForest::score_many_early_term`] errors.
437    pub fn score_many_early_term(
438        &mut self,
439        key: &K,
440        points: &[[f64; D]],
441        config: crate::early_term::EarlyTermConfig,
442    ) -> RcfResult<Vec<crate::early_term::EarlyTermScore>> {
443        self.touch_or_create(key)?
444            .score_many_early_term(points, config)
445    }
446
447    /// Cross-tenant what-if scoring — pipe the **same** `point`
448    /// through every resident tenant's detector and collect
449    /// `(key, grade)` pairs sorted by descending grade.
450    ///
451    /// Primary use case: MSSP / threat-intel lateral scan.
452    /// Analyst investigates an anomaly on tenant A, wants to see
453    /// which other tenants' baselines flag the same observation —
454    /// common pattern for supply-chain / shared-infra compromises.
455    ///
456    /// Tenants currently in the warming-up window
457    /// ([`crate::AnomalyGrade::ready`] returns `false`) are
458    /// skipped so callers only see confidence-bearing grades.
459    /// Does **not** auto-create any tenant. Does not mutate
460    /// detector state (read-only path).
461    ///
462    /// # Errors
463    ///
464    /// Propagates [`crate::ThresholdedForest::score_only`] errors.
465    pub fn score_across_tenants(&self, point: &[f64; D]) -> RcfResult<Vec<(K, AnomalyGrade)>>
466    where
467        K: Send + Sync,
468    {
469        #[cfg(feature = "parallel")]
470        let collected: RcfResult<Vec<Option<(K, AnomalyGrade)>>> = {
471            use rayon::prelude::*;
472            // Snapshot into a Vec of references so rayon can split.
473            let entries: Vec<(&K, &ThresholdedForest<D>)> = self
474                .forests
475                .iter()
476                .map(|(k, slot)| (k, slot.forest.as_ref()))
477                .collect();
478            entries
479                .par_iter()
480                .map(|(k, f)| -> RcfResult<Option<(K, AnomalyGrade)>> {
481                    let grade = f.score_only(point)?;
482                    if !grade.ready() {
483                        return Ok(None);
484                    }
485                    Ok(Some(((*k).clone(), grade)))
486                })
487                .collect()
488        };
489        #[cfg(not(feature = "parallel"))]
490        let collected: RcfResult<Vec<Option<(K, AnomalyGrade)>>> = self
491            .forests
492            .iter()
493            .map(|(k, slot)| -> RcfResult<Option<(K, AnomalyGrade)>> {
494                let grade = slot.forest.score_only(point)?;
495                if !grade.ready() {
496                    return Ok(None);
497                }
498                Ok(Some((k.clone(), grade)))
499            })
500            .collect();
501
502        let mut out: Vec<(K, AnomalyGrade)> = collected?.into_iter().flatten().collect();
503        out.sort_by(|a, b| {
504            b.1.grade()
505                .partial_cmp(&a.1.grade())
506                .unwrap_or(core::cmp::Ordering::Equal)
507        });
508        Ok(out)
509    }
510
511    /// Pairwise similarity between every tenant in the pool,
512    /// computed on each tenant's anomaly-score EMA stats
513    /// (`mean`, `stddev`). Tenants with fewer than
514    /// `min_observations` samples are skipped — their stats are
515    /// too noisy to compare.
516    ///
517    /// Similarity is `exp(-sqrt(Δmean² + Δstddev²))` ∈ `(0, 1]`:
518    /// identical distributions → `1.0`, unrelated → near `0`.
519    /// Returns `(key_a, key_b, similarity)` triples with
520    /// `key_a < key_b` ordering not guaranteed — callers that care
521    /// about a canonical order should sort their own slice.
522    #[must_use]
523    pub fn similarity_matrix(&self, min_observations: u64) -> Vec<(K, K, f64)>
524    where
525        K: Send + Sync,
526    {
527        let tenants: Vec<(&K, &ThresholdedForest<D>)> = self
528            .forests
529            .iter()
530            .filter_map(|(k, slot)| {
531                if slot.forest.stats().observations() >= min_observations {
532                    Some((k, slot.forest.as_ref()))
533                } else {
534                    None
535                }
536            })
537            .collect();
538        let n = tenants.len();
539
540        #[cfg(feature = "parallel")]
541        {
542            use rayon::prelude::*;
543            // Enumerate pairs sequentially (cheap — `n² / 2` usize
544            // tuples), par-iterate the work. Keeps `tenants`
545            // borrowed throughout instead of moving it into nested
546            // closures.
547            let mut pairs: Vec<(usize, usize)> = Vec::with_capacity(n * n / 2);
548            for i in 0..n {
549                for j in (i + 1)..n {
550                    pairs.push((i, j));
551                }
552            }
553            pairs
554                .par_iter()
555                .map(|&(i, j)| {
556                    let (k_a, f_a) = tenants[i];
557                    let (k_b, f_b) = tenants[j];
558                    let dm = f_a.stats().mean() - f_b.stats().mean();
559                    let ds = f_a.stats().stddev() - f_b.stats().stddev();
560                    let dist = (dm * dm + ds * ds).sqrt();
561                    (k_a.clone(), k_b.clone(), (-dist).exp())
562                })
563                .collect()
564        }
565        #[cfg(not(feature = "parallel"))]
566        {
567            let mut out = Vec::with_capacity(n * n / 2);
568            for (i, &(k_a, f_a)) in tenants.iter().enumerate() {
569                let mean_a = f_a.stats().mean();
570                let stddev_a = f_a.stats().stddev();
571                for &(k_b, f_b) in tenants.iter().skip(i + 1) {
572                    let dm = mean_a - f_b.stats().mean();
573                    let ds = stddev_a - f_b.stats().stddev();
574                    let dist = (dm * dm + ds * ds).sqrt();
575                    out.push((k_a.clone(), k_b.clone(), (-dist).exp()));
576                }
577            }
578            out
579        }
580    }
581
582    /// Top-`n` tenants most similar to `key`, sorted by descending
583    /// similarity. Excludes `key` itself and tenants below
584    /// `min_observations`. Returns an empty vec when `key` is
585    /// absent or the pool is otherwise empty.
586    ///
587    /// See [`Self::similarity_matrix`] for the similarity metric.
588    #[must_use]
589    pub fn most_similar(&self, key: &K, top_n: usize, min_observations: u64) -> Vec<(K, f64)> {
590        let Some(ref_slot) = self.forests.get(key) else {
591            return Vec::new();
592        };
593        let ref_stats = ref_slot.forest.stats();
594        if ref_stats.observations() < min_observations {
595            return Vec::new();
596        }
597        if top_n == 0 {
598            return Vec::new();
599        }
600
601        // Bounded min-heap: keep the `top_n` entries with the
602        // *highest* similarity. `MostSimilarHeapEntry::cmp` is
603        // inverted so `peek` returns the lowest-similarity entry —
604        // the one to evict when a better candidate arrives.
605        // O(N · log top_n) vs. the naive O(N · log N) sort path.
606        let mut heap: BinaryHeap<MostSimilarHeapEntry<K>> = BinaryHeap::with_capacity(top_n + 1);
607        for (k, slot) in &self.forests {
608            if k == key {
609                continue;
610            }
611            let stats = slot.forest.stats();
612            if stats.observations() < min_observations {
613                continue;
614            }
615            let dm = ref_stats.mean() - stats.mean();
616            let ds = ref_stats.stddev() - stats.stddev();
617            let dist = (dm * dm + ds * ds).sqrt();
618            let sim = (-dist).exp();
619            if heap.len() < top_n {
620                heap.push(MostSimilarHeapEntry {
621                    sim,
622                    key: k.clone(),
623                });
624            } else if let Some(min_entry) = heap.peek()
625                && sim > min_entry.sim
626            {
627                heap.pop();
628                heap.push(MostSimilarHeapEntry {
629                    sim,
630                    key: k.clone(),
631                });
632            }
633        }
634        // Drain heap and sort descending for caller-facing output.
635        let mut out: Vec<(K, f64)> = heap.into_iter().map(|e| (e.key, e.sim)).collect();
636        out.sort_unstable_by(|a, b| b.1.partial_cmp(&a.1).unwrap_or(Ordering::Equal));
637        out
638    }
639
640    /// Per-tenant imputation-like forensic baseline. Returns `None`
641    /// when the tenant is absent — does not auto-create (forensic
642    /// is a read path).
643    ///
644    /// # Errors
645    ///
646    /// Propagates [`ThresholdedForest::forensic_baseline`] errors.
647    pub fn forensic_baseline(
648        &mut self,
649        key: &K,
650        point: &[f64; D],
651    ) -> RcfResult<Option<crate::forensic::ForensicBaseline<D>>> {
652        match self.get_mut(key) {
653            Some(detector) => Ok(Some(detector.forensic_baseline(point)?)),
654            None => Ok(None),
655        }
656    }
657
658    /// Bulk per-feature attribution on a tenant's detector.
659    /// Auto-creates the tenant.
660    ///
661    /// # Errors
662    ///
663    /// Propagates [`ThresholdedForest::attribution_many`] errors.
664    pub fn attribution_many(&mut self, key: &K, points: &[[f64; D]]) -> RcfResult<Vec<DiVector>> {
665        self.touch_or_create(key)?.attribution_many(points)
666    }
667
668    /// Timestamped variant of [`Self::process`] — tags the freshly
669    /// inserted point with `timestamp` on the tenant's forest, so
670    /// [`Self::delete_before`] can retract history by age.
671    ///
672    /// # Errors
673    ///
674    /// Propagates [`ThresholdedForest::process_at`] errors.
675    pub fn process_at(
676        &mut self,
677        key: &K,
678        point: [f64; D],
679        timestamp: u64,
680    ) -> RcfResult<AnomalyGrade> {
681        self.touch_or_create(key)?.process_at(point, timestamp)
682    }
683
684    /// Retract every point older than `cutoff` from a tenant's
685    /// detector. Returns `Ok(0)` (without creating the tenant) when
686    /// the tenant is absent — retention paths must never spin up a
687    /// fresh detector.
688    ///
689    /// # Errors
690    ///
691    /// Propagates [`ThresholdedForest::delete_before`] errors.
692    pub fn delete_before(&mut self, key: &K, cutoff: u64) -> RcfResult<usize> {
693        match self.get_mut(key) {
694            Some(detector) => detector.delete_before(cutoff),
695            None => Ok(0),
696        }
697    }
698
699    /// Early-termination scoring on a tenant's detector. Auto-
700    /// creates the tenant (like [`Self::process`]) — cold-start
701    /// returns `EmptyForest`, just like
702    /// [`ThresholdedForest::score_early_term`].
703    ///
704    /// # Errors
705    ///
706    /// Propagates factory errors and
707    /// [`ThresholdedForest::score_early_term`] errors.
708    pub fn score_early_term(
709        &mut self,
710        key: &K,
711        point: &[f64; D],
712        config: crate::early_term::EarlyTermConfig,
713    ) -> RcfResult<crate::early_term::EarlyTermScore> {
714        self.touch_or_create(key)?.score_early_term(point, config)
715    }
716
717    /// Retract a previously-observed point from a tenant's forest by
718    /// its `point_idx`. Returns `Ok(false)` (and does not create the
719    /// tenant) when the tenant is absent — SOC retraction paths must
720    /// not silently spin up fresh detectors.
721    ///
722    /// # Errors
723    ///
724    /// Propagates [`ThresholdedForest::delete`] errors.
725    pub fn delete(&mut self, key: &K, point_idx: usize) -> RcfResult<bool> {
726        match self.get_mut(key) {
727            Some(detector) => detector.delete(point_idx),
728            None => Ok(false),
729        }
730    }
731
732    /// Retract every point whose stored value bit-matches `point`
733    /// for a given tenant. Returns `Ok(0)` (and does not create the
734    /// tenant) when the tenant is absent.
735    ///
736    /// # Errors
737    ///
738    /// Propagates [`ThresholdedForest::delete_by_value`] errors.
739    pub fn delete_by_value(&mut self, key: &K, point: &[f64; D]) -> RcfResult<usize> {
740        match self.get_mut(key) {
741            Some(detector) => detector.delete_by_value(point),
742            None => Ok(0),
743        }
744    }
745
746    /// Replay historical `points` into the tenant's detector before
747    /// any live traffic. Lazily instantiates the tenant (like
748    /// [`Self::process`]), then delegates to
749    /// [`ThresholdedForest::bootstrap`]. Returns a report summarising
750    /// ingestion.
751    ///
752    /// Use this when restarting a long-running agent: pull recent
753    /// per-tenant history from the upstream TSDB (`Prometheus`,
754    /// `Loki`, `InfluxDB`, parquet dump…), hand it to this method per
755    /// tenant, and the pool is hot before the live streaming pipeline
756    /// is switched back on — avoiding the per-tenant warmup coverage
757    /// hole.
758    ///
759    /// # Errors
760    ///
761    /// Propagates factory and
762    /// [`ThresholdedForest::bootstrap`] errors.
763    pub fn bootstrap<I>(&mut self, key: &K, points: I) -> RcfResult<BootstrapReport>
764    where
765        I: IntoIterator<Item = [f64; D]>,
766    {
767        self.touch_or_create(key)?.bootstrap(points)
768    }
769
770    /// Install a pre-built detector for `key`, replacing any
771    /// existing entry. Useful for warm reload — iterate a directory
772    /// of per-tenant snapshots and pump them back into a fresh pool.
773    ///
774    /// Returns the displaced detector if one was already resident.
775    /// When inserting a brand-new tenant would push the pool past
776    /// `capacity`, the least-recently-used tenant is evicted first.
777    pub fn insert(&mut self, key: K, forest: ThresholdedForest<D>) -> Option<ThresholdedForest<D>> {
778        let tick = self.bump_access();
779        let now = Instant::now();
780        if !self.forests.contains_key(&key) && self.forests.len() >= self.capacity {
781            self.evict_lru();
782        }
783        let previous = self
784            .forests
785            .insert(
786                key,
787                TenantSlot {
788                    forest: Box::new(forest),
789                    last_access: tick,
790                    last_access_instant: now,
791                },
792            )
793            .map(|slot| *slot.forest);
794        self.emit_resident_gauge();
795        previous
796    }
797
798    /// Drop the tenant's detector. Returns the detector so callers
799    /// can hand it back to the factory or persist it before release.
800    pub fn remove(&mut self, key: &K) -> Option<ThresholdedForest<D>> {
801        let out = self.forests.remove(key).map(|slot| *slot.forest);
802        if out.is_some() {
803            self.emit_resident_gauge();
804        }
805        out
806    }
807
808    /// Drop every tenant's detector.
809    pub fn clear(&mut self) {
810        self.forests.clear();
811        self.emit_resident_gauge();
812    }
813
814    /// Iterate `(key, detector)` pairs in an unspecified order —
815    /// use this for snapshot / migration.
816    pub fn iter(&self) -> impl Iterator<Item = (&K, &ThresholdedForest<D>)> + '_ {
817        self.forests.iter().map(|(k, slot)| (k, &*slot.forest))
818    }
819
820    /// Mutable iteration over `(key, detector)` pairs. Does not bump
821    /// any tenant's LRU tick — callers are assumed to be scanning
822    /// for bulk operations (save to disk, migrate, reset stats).
823    pub fn iter_mut(&mut self) -> impl Iterator<Item = (&K, &mut ThresholdedForest<D>)> + '_ {
824        self.forests
825            .iter_mut()
826            .map(|(k, slot)| (k, slot.forest.as_mut()))
827    }
828
829    /// Snapshot every tenant key currently held.
830    #[must_use]
831    pub fn tenants(&self) -> Vec<K> {
832        self.forests.keys().cloned().collect()
833    }
834
835    /// Snapshot aggregate readiness of the pool — how many tenants
836    /// are warm vs warming, capacity headroom, and lifetime
837    /// create/evict counters. Cheap `O(resident)` scan.
838    ///
839    /// Use for `/healthz` / `/readyz` endpoints: a pool with
840    /// `readiness_ratio() < 0.5` on a mature deployment typically
841    /// indicates either bootstrap lag or aggressive eviction
842    /// thrashing.
843    #[must_use]
844    pub fn readiness_summary(&self) -> ReadinessSummary {
845        let mut warming = 0_usize;
846        let mut ready = 0_usize;
847        for slot in self.forests.values() {
848            let cfg = slot.forest.thresholded_config();
849            if slot.forest.stats().observations() >= cfg.min_observations
850                && slot.forest.stats().stddev() > 0.0
851            {
852                ready = ready.saturating_add(1);
853            } else {
854                warming = warming.saturating_add(1);
855            }
856        }
857        ReadinessSummary {
858            resident: self.forests.len(),
859            warming,
860            ready,
861            capacity: self.capacity,
862            tenants_created_lifetime: self.tenants_created_lifetime,
863            tenants_evicted_lifetime: self.tenants_evicted_lifetime,
864        }
865    }
866
867    /// Evict the least-recently-used tenant explicitly. Returns the
868    /// evicted `(key, detector)` pair so callers can persist it
869    /// before release.
870    ///
871    /// Public so custom eviction strategies (time-based, manual
872    /// shedding, etc.) can drive the pool without going through the
873    /// auto-eviction path.
874    pub fn evict_lru(&mut self) -> Option<(K, ThresholdedForest<D>)> {
875        let victim_key = self
876            .forests
877            .iter()
878            .min_by_key(|(_, slot)| slot.last_access)
879            .map(|(k, _)| k.clone())?;
880        let slot = self.forests.remove(&victim_key)?;
881        self.tenants_evicted_lifetime = self.tenants_evicted_lifetime.saturating_add(1);
882        self.metrics
883            .inc_counter(crate::metrics::names::TENANT_EVICTIONS_TOTAL, 1);
884        self.emit_resident_gauge();
885        Some((victim_key, *slot.forest))
886    }
887
888    /// Evict every tenant whose wall-clock last-access is older than
889    /// `ttl`. Returns the evicted `(key, detector)` pairs in
890    /// unspecified order so callers can persist them before release
891    /// (SOC retention windows, GDPR purge, cold-path archival).
892    ///
893    /// Orthogonal to [`Self::evict_lru`]: LRU sheds on *capacity
894    /// pressure*, `evict_idle` sheds on *wall-clock staleness*.
895    /// Call on a schedule (e.g. every minute from a background
896    /// task) — the pool mutation takes `&mut self` so the caller
897    /// owns the cadence.
898    ///
899    /// Bumps both the aggregate `rcf_tenant_evictions_total` counter
900    /// and the dedicated `rcf_tenant_idle_evictions_total` counter
901    /// so dashboards can distinguish pressure-driven churn from
902    /// retention-driven shedding.
903    pub fn evict_idle(&mut self, ttl: Duration) -> Vec<(K, ThresholdedForest<D>)> {
904        let now = Instant::now();
905        // Enumerate victims first so we do not iterate a map we
906        // are simultaneously mutating. Cheap — at most `capacity`
907        // entries, keyed on clone.
908        let victims: Vec<K> = self
909            .forests
910            .iter()
911            .filter_map(|(k, slot)| {
912                if now.saturating_duration_since(slot.last_access_instant) > ttl {
913                    Some(k.clone())
914                } else {
915                    None
916                }
917            })
918            .collect();
919        let mut evicted = Vec::with_capacity(victims.len());
920        for key in victims {
921            if let Some(slot) = self.forests.remove(&key) {
922                self.tenants_evicted_lifetime = self.tenants_evicted_lifetime.saturating_add(1);
923                self.metrics
924                    .inc_counter(crate::metrics::names::TENANT_EVICTIONS_TOTAL, 1);
925                self.metrics
926                    .inc_counter(crate::metrics::names::TENANT_IDLE_EVICTIONS_TOTAL, 1);
927                evicted.push((key, *slot.forest));
928            }
929        }
930        if !evicted.is_empty() {
931            self.emit_resident_gauge();
932        }
933        evicted
934    }
935
936    /// Bump the monotonic access counter and return the new value.
937    fn bump_access(&mut self) -> u64 {
938        self.access_counter = self.access_counter.saturating_add(1);
939        self.access_counter
940    }
941
942    /// Shared entry point for every access path: update the LRU
943    /// tick, invoke the factory on first-seen tenants, and evict
944    /// when the pool is full.
945    fn touch_or_create(&mut self, key: &K) -> RcfResult<&mut ThresholdedForest<D>> {
946        let tick = self.bump_access();
947        let now = Instant::now();
948        if !self.forests.contains_key(key) {
949            if self.forests.len() >= self.capacity {
950                self.evict_lru();
951            }
952            let forest = (self.factory)()?;
953            self.forests.insert(
954                key.clone(),
955                TenantSlot {
956                    forest: Box::new(forest),
957                    last_access: tick,
958                    last_access_instant: now,
959                },
960            );
961            self.tenants_created_lifetime = self.tenants_created_lifetime.saturating_add(1);
962            self.metrics
963                .inc_counter(crate::metrics::names::TENANT_CREATED_TOTAL, 1);
964            self.emit_resident_gauge();
965        }
966        // At this point the entry exists; bump its access stamp and
967        // return a mutable handle.
968        let slot = self.forests.get_mut(key).expect("tenant was just inserted");
969        slot.last_access = tick;
970        slot.last_access_instant = now;
971        Ok(slot.forest.as_mut())
972    }
973}
974
975#[cfg(test)]
976#[allow(clippy::float_cmp)] // Tests assert bounds on closed-form quantities.
977mod tests {
978    use super::*;
979    use crate::ThresholdedForestBuilder;
980
981    fn factory_2d() -> impl Fn() -> RcfResult<ThresholdedForest<2>> {
982        || {
983            ThresholdedForestBuilder::<2>::new()
984                .num_trees(50)
985                .sample_size(16)
986                .min_observations(4)
987                .min_threshold(0.0)
988                .seed(42)
989                .build()
990        }
991    }
992
993    #[test]
994    fn new_rejects_zero_capacity() {
995        let err = TenantForestPool::<String, 2>::new(0, factory_2d()).unwrap_err();
996        assert!(matches!(err, RcfError::InvalidConfig(_)));
997    }
998
999    #[test]
1000    fn new_accepts_capacity_one() {
1001        let p = TenantForestPool::<String, 2>::new(1, factory_2d()).unwrap();
1002        assert_eq!(p.capacity(), 1);
1003        assert_eq!(p.len(), 0);
1004        assert!(p.is_empty());
1005    }
1006
1007    #[test]
1008    fn process_auto_creates_tenant() {
1009        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1010        assert!(!p.contains(&"a"));
1011        p.process(&"a", [0.0, 0.0]).unwrap();
1012        assert!(p.contains(&"a"));
1013        assert_eq!(p.len(), 1);
1014    }
1015
1016    #[test]
1017    fn process_evicts_lru_when_full() {
1018        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1019        p.process(&"a", [0.0, 0.0]).unwrap();
1020        p.process(&"b", [1.0, 1.0]).unwrap();
1021        // Touch `a` so `b` becomes LRU.
1022        p.process(&"a", [0.1, 0.1]).unwrap();
1023        p.process(&"c", [2.0, 2.0]).unwrap();
1024        assert!(p.contains(&"a"));
1025        assert!(!p.contains(&"b"), "b should have been evicted");
1026        assert!(p.contains(&"c"));
1027        assert_eq!(p.len(), 2);
1028    }
1029
1030    #[test]
1031    fn peek_does_not_update_lru() {
1032        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1033        p.process(&"old", [0.0, 0.0]).unwrap();
1034        p.process(&"new", [1.0, 1.0]).unwrap();
1035        // peek `old` — should NOT prevent its eviction.
1036        let _ = p.peek(&"old");
1037        p.process(&"newer", [2.0, 2.0]).unwrap();
1038        assert!(!p.contains(&"old"), "peek should not refresh LRU");
1039    }
1040
1041    #[test]
1042    fn get_does_update_lru() {
1043        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1044        p.process(&"old", [0.0, 0.0]).unwrap();
1045        p.process(&"new", [1.0, 1.0]).unwrap();
1046        let _ = p.get(&"old");
1047        p.process(&"newer", [2.0, 2.0]).unwrap();
1048        assert!(p.contains(&"old"), "get should refresh LRU");
1049        assert!(!p.contains(&"new"), "new should be evicted instead");
1050    }
1051
1052    #[test]
1053    fn remove_returns_detector() {
1054        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1055        p.process(&"a", [0.0, 0.0]).unwrap();
1056        let detector = p.remove(&"a").unwrap();
1057        assert_eq!(detector.forest().num_trees(), 50);
1058        assert!(!p.contains(&"a"));
1059    }
1060
1061    #[test]
1062    fn remove_returns_none_for_missing_tenant() {
1063        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1064        assert!(p.remove(&"nope").is_none());
1065    }
1066
1067    #[test]
1068    fn insert_replaces_existing() {
1069        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1070        p.process(&"a", [0.0, 0.0]).unwrap();
1071        let fresh = (factory_2d())().unwrap();
1072        let old = p.insert("a", fresh).unwrap();
1073        assert_eq!(old.forest().num_trees(), 50);
1074        assert!(p.contains(&"a"));
1075        assert_eq!(p.len(), 1);
1076    }
1077
1078    #[test]
1079    fn insert_evicts_when_full_and_key_new() {
1080        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1081        p.process(&"a", [0.0, 0.0]).unwrap();
1082        p.process(&"b", [1.0, 1.0]).unwrap();
1083        let fresh = (factory_2d())().unwrap();
1084        p.insert("c", fresh);
1085        assert_eq!(p.len(), 2);
1086        assert!(p.contains(&"c"));
1087    }
1088
1089    #[test]
1090    fn clear_drops_all_tenants() {
1091        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1092        p.process(&"a", [0.0, 0.0]).unwrap();
1093        p.process(&"b", [1.0, 1.0]).unwrap();
1094        p.clear();
1095        assert!(p.is_empty());
1096    }
1097
1098    #[test]
1099    fn iter_visits_every_tenant() {
1100        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1101        p.process(&"a", [0.0, 0.0]).unwrap();
1102        p.process(&"b", [1.0, 1.0]).unwrap();
1103        let mut keys: Vec<&&str> = p.iter().map(|(k, _)| k).collect();
1104        keys.sort();
1105        assert_eq!(keys, vec![&"a", &"b"]);
1106    }
1107
1108    #[test]
1109    fn evict_lru_returns_oldest_tenant() {
1110        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1111        p.process(&"a", [0.0, 0.0]).unwrap();
1112        p.process(&"b", [1.0, 1.0]).unwrap();
1113        p.process(&"a", [0.1, 0.1]).unwrap();
1114        let (key, _) = p.evict_lru().unwrap();
1115        assert_eq!(key, "b");
1116    }
1117
1118    #[test]
1119    fn evict_lru_on_empty_pool_returns_none() {
1120        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1121        assert!(p.evict_lru().is_none());
1122    }
1123
1124    #[test]
1125    fn evict_idle_retains_fresh_and_evicts_stale() {
1126        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1127        p.process(&"a", [0.0, 0.0]).unwrap();
1128        p.process(&"b", [1.0, 1.0]).unwrap();
1129        // Sleep just long enough to make `a` and `b` older than the
1130        // TTL; then touch `a` so only `b` crosses the threshold.
1131        std::thread::sleep(std::time::Duration::from_millis(40));
1132        p.process(&"a", [0.1, 0.1]).unwrap();
1133        let evicted = p.evict_idle(std::time::Duration::from_millis(20));
1134        let evicted_keys: Vec<&&str> = evicted.iter().map(|(k, _)| k).collect();
1135        assert_eq!(evicted_keys, vec![&"b"]);
1136        assert!(p.contains(&"a"));
1137        assert!(!p.contains(&"b"));
1138    }
1139
1140    #[test]
1141    fn evict_idle_empty_pool_returns_empty() {
1142        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1143        let evicted = p.evict_idle(std::time::Duration::from_secs(1));
1144        assert!(evicted.is_empty());
1145    }
1146
1147    #[test]
1148    fn readiness_summary_empty_pool() {
1149        let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1150        let s = p.readiness_summary();
1151        assert_eq!(s.resident, 0);
1152        assert_eq!(s.warming, 0);
1153        assert_eq!(s.ready, 0);
1154        assert_eq!(s.capacity, 4);
1155        assert_eq!(s.tenants_created_lifetime, 0);
1156        assert_eq!(s.tenants_evicted_lifetime, 0);
1157        assert!(s.is_fully_ready()); // vacuous
1158        assert!(!s.is_at_capacity());
1159        assert!(s.readiness_ratio().is_nan());
1160    }
1161
1162    #[test]
1163    fn readiness_summary_counts_warming_and_ready() {
1164        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1165        // Warm tenant "a" past min_observations (factory_2d default 4).
1166        for i in 0_u32..16 {
1167            let v = f64::from(i) * 0.01;
1168            p.process(&"a", [v, v]).unwrap();
1169        }
1170        // "b" only 2 obs — warming.
1171        p.process(&"b", [0.0, 0.0]).unwrap();
1172        p.process(&"b", [0.01, 0.01]).unwrap();
1173        let s = p.readiness_summary();
1174        assert_eq!(s.resident, 2);
1175        assert_eq!(s.ready, 1);
1176        assert_eq!(s.warming, 1);
1177        assert!((s.readiness_ratio() - 0.5).abs() < 1.0e-9);
1178        assert!(!s.is_fully_ready());
1179    }
1180
1181    #[test]
1182    fn readiness_summary_tracks_lifetime_counters() {
1183        let mut p = TenantForestPool::<&'static str, 2>::new(2, factory_2d()).unwrap();
1184        p.process(&"a", [0.0, 0.0]).unwrap();
1185        p.process(&"b", [1.0, 1.0]).unwrap();
1186        p.process(&"c", [2.0, 2.0]).unwrap(); // evicts "a"
1187        let s = p.readiness_summary();
1188        assert_eq!(s.tenants_created_lifetime, 3);
1189        assert_eq!(s.tenants_evicted_lifetime, 1);
1190        assert_eq!(s.resident, 2);
1191        assert!(s.is_at_capacity());
1192    }
1193
1194    #[test]
1195    fn evict_idle_zero_ttl_evicts_all_non_just_touched() {
1196        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1197        p.process(&"a", [0.0, 0.0]).unwrap();
1198        p.process(&"b", [1.0, 1.0]).unwrap();
1199        // Small sleep so `now` advances past the per-slot instants.
1200        std::thread::sleep(std::time::Duration::from_millis(5));
1201        let evicted = p.evict_idle(std::time::Duration::from_millis(0));
1202        assert_eq!(evicted.len(), 2);
1203        assert!(p.is_empty());
1204    }
1205
1206    #[test]
1207    fn factory_error_propagates() {
1208        let mut p = TenantForestPool::<&'static str, 2>::new(4, || {
1209            Err(RcfError::InvalidConfig("forced".into()))
1210        })
1211        .unwrap();
1212        let err = p.process(&"x", [0.0, 0.0]).unwrap_err();
1213        assert!(matches!(err, RcfError::InvalidConfig(_)));
1214        assert!(p.is_empty(), "failed factory should not leave an entry");
1215    }
1216
1217    #[test]
1218    fn score_only_auto_creates_but_leaves_stats_empty() {
1219        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1220        let verdict = p.score_only(&"a", &[0.0, 0.0]).unwrap();
1221        assert!(!verdict.ready(), "brand-new detector should warming-up");
1222        assert!(p.contains(&"a"));
1223    }
1224
1225    #[test]
1226    fn tenants_returns_live_keys() {
1227        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1228        p.process(&"a", [0.0, 0.0]).unwrap();
1229        p.process(&"b", [1.0, 1.0]).unwrap();
1230        let mut ts = p.tenants();
1231        ts.sort_unstable();
1232        assert_eq!(ts, vec!["a", "b"]);
1233    }
1234
1235    #[test]
1236    fn score_across_tenants_ranks_desc_and_skips_cold() {
1237        let mut p = TenantForestPool::<&'static str, 2>::new(8, factory_2d()).unwrap();
1238        // Warm tenants a, b, c past min_observations=4.
1239        for i in 0_u32..16 {
1240            let v = f64::from(i) * 0.01;
1241            p.process(&"a", [v, v]).unwrap();
1242            p.process(&"b", [v + 5.0, v + 5.0]).unwrap();
1243            p.process(&"c", [v + 100.0, v + 100.0]).unwrap();
1244        }
1245        // Tenant d: warming up only.
1246        p.process(&"d", [0.5, 0.5]).unwrap();
1247
1248        let out = p.score_across_tenants(&[50.0, 50.0]).unwrap();
1249        // d skipped (not ready).
1250        assert!(out.iter().all(|(k, _)| *k != "d"));
1251        // Sorted desc.
1252        for [a, b] in out.array_windows::<2>() {
1253            assert!(a.1.grade() >= b.1.grade());
1254        }
1255    }
1256
1257    #[test]
1258    fn score_across_tenants_empty_pool_returns_empty() {
1259        let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1260        let out = p.score_across_tenants(&[0.0, 0.0]).unwrap();
1261        assert!(out.is_empty());
1262    }
1263
1264    #[test]
1265    fn similarity_matrix_empty_pool_returns_empty() {
1266        let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1267        assert!(p.similarity_matrix(0).is_empty());
1268    }
1269
1270    #[test]
1271    fn similarity_matrix_skips_undertrained() {
1272        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1273        // Tenant A: plenty of observations.
1274        for i in 0_u32..64 {
1275            let v = f64::from(i) * 0.01;
1276            p.process(&"a", [v, v]).unwrap();
1277        }
1278        // Tenant B: very few observations — should be skipped with
1279        // min_observations = 32.
1280        for i in 0_u32..8 {
1281            let v = f64::from(i) * 0.01;
1282            p.process(&"b", [v, v]).unwrap();
1283        }
1284        let pairs = p.similarity_matrix(32);
1285        assert!(pairs.is_empty(), "only A passed the min_obs threshold");
1286    }
1287
1288    #[test]
1289    fn most_similar_ranks_correctly() {
1290        let mut p = TenantForestPool::<&'static str, 2>::new(8, factory_2d()).unwrap();
1291        // Three tenants: A and C with similar baseline, B different.
1292        for i in 0_u32..64 {
1293            let v = f64::from(i) * 0.01;
1294            p.process(&"a", [v, v]).unwrap();
1295            p.process(&"c", [v, v]).unwrap();
1296            p.process(&"b", [v + 10.0, v + 10.0]).unwrap();
1297        }
1298        let ranked = p.most_similar(&"a", 2, 1);
1299        assert_eq!(ranked.len(), 2);
1300        // c should rank above b (scores should be closer for same-baseline tenants).
1301        let c_sim = ranked.iter().find(|(k, _)| *k == "c").unwrap().1;
1302        let b_sim = ranked.iter().find(|(k, _)| *k == "b").unwrap().1;
1303        assert!(
1304            c_sim >= b_sim,
1305            "c similarity {c_sim} should be >= b {b_sim}"
1306        );
1307    }
1308
1309    #[test]
1310    fn most_similar_absent_key_returns_empty() {
1311        let p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1312        assert!(p.most_similar(&"unknown", 3, 0).is_empty());
1313    }
1314
1315    #[test]
1316    fn isolation_between_tenants() {
1317        // A big outlier for tenant A must not raise tenant B's
1318        // threshold. Each tenant owns its own stats.
1319        let mut p = TenantForestPool::<&'static str, 2>::new(4, factory_2d()).unwrap();
1320        for i in 0_u32..32 {
1321            let v = f64::from(i) * 0.01;
1322            p.process(&"a", [v, v]).unwrap();
1323            p.process(&"b", [v, v]).unwrap();
1324        }
1325        // Shock tenant A with an outlier many times.
1326        for _ in 0..10 {
1327            p.process(&"a", [100.0, 100.0]).unwrap();
1328        }
1329        let a_threshold = p.peek(&"a").unwrap().current_threshold();
1330        let b_threshold = p.peek(&"b").unwrap().current_threshold();
1331        assert!(
1332            a_threshold > b_threshold,
1333            "tenant A threshold {a_threshold} should be > tenant B threshold {b_threshold}",
1334        );
1335    }
1336}