Skip to main content

anomstream_core/thresholded/
config.rs

1//! Configuration + builder for [`crate::ThresholdedForest`].
2//!
3//! [`ThresholdedConfig`] holds the parameters that govern the adaptive
4//! threshold layer on top of the underlying [`crate::RandomCutForest`]:
5//!
6//! | Field | Role | Default |
7//! |---|---|---|
8//! | `z_factor` | Multiplier on the score stddev used to derive the threshold (`mean + z · stddev`). | `3.0` |
9//! | `score_decay` | EMA smoothing factor for the running mean/variance of the anomaly scores. | `0.01` |
10//! | `min_observations` | Samples required before the detector emits a non-warmup verdict. | `32` |
11//! | `min_threshold` | Absolute floor on the adaptive threshold — prevents a near-zero stddev from firing on trivial jitter. | `1.0` |
12//!
13//! The builder mirrors [`crate::ForestBuilder`] so forest and threshold
14//! parameters can be tuned side-by-side in one fluent chain.
15
16use alloc::format;
17
18use crate::config::ForestBuilder;
19use crate::error::{RcfError, RcfResult};
20use crate::thresholded::detector::ThresholdedForest;
21
22/// Default `z_factor` — 3 standard deviations above the running mean,
23/// matching the AWS `SageMaker` RCF guidance ("scores beyond 3σ are
24/// considered anomalous"). Only meaningful under
25/// [`ThresholdMode::ZSigma`].
26pub const DEFAULT_Z_FACTOR: f64 = 3.0;
27
28/// Default streaming quantile used when
29/// [`ThresholdMode::Quantile`] is selected — `0.99` lets 1 % of
30/// scores cross the threshold in steady state, matching the typical
31/// SOC alert-rate budget.
32pub const DEFAULT_QUANTILE: f64 = 0.99;
33
34/// Default EMA smoothing factor on the anomaly-score stream. `0.01`
35/// corresponds to an effective memory window of ~100 points.
36pub const DEFAULT_SCORE_DECAY: f64 = 0.01;
37
38/// Default minimum observations before the detector emits a
39/// non-warmup verdict.
40pub const DEFAULT_MIN_OBSERVATIONS: u64 = 32;
41
42/// Default absolute floor on the adaptive threshold.
43pub const DEFAULT_MIN_THRESHOLD: f64 = 1.0;
44
45/// Which statistic drives the adaptive threshold. Isolation-depth
46/// scores are right-skewed and heavy-tailed (not Gaussian), so the
47/// `μ + z·σ` form systematically over-flags during baseline calm
48/// periods and under-flags during drift. [`ThresholdMode::Quantile`]
49/// uses a streaming `TDigest` of the score distribution and thresholds
50/// on the chosen tail percentile — closer to the caller's actual
51/// alert-rate budget (e.g. `p = 0.99` ≈ 1 % firing rate).
52#[derive(Debug, Clone, Copy, PartialEq)]
53#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
54#[non_exhaustive]
55pub enum ThresholdMode {
56    /// Classic `mean + z · stddev` on the EMA of the score stream.
57    /// Back-compat default; keep this mode for Gaussian-like scores
58    /// (lag-embedded streams with symmetric noise).
59    ZSigma {
60        /// Multiplier on the EMA stddev.
61        z_factor: f64,
62    },
63    /// Streaming quantile threshold — `threshold = TDigest.quantile(p)`
64    /// of observed scores. Robust to the isolation-depth right-skew;
65    /// calibrates directly on the caller's alert-rate budget. `p`
66    /// must be in `(0, 1)`; typical values are `0.99` / `0.999`.
67    Quantile {
68        /// Quantile used as the threshold. Higher `p` means a
69        /// stricter threshold (fewer firings).
70        p: f64,
71    },
72}
73
74impl Default for ThresholdMode {
75    fn default() -> Self {
76        Self::ZSigma {
77            z_factor: DEFAULT_Z_FACTOR,
78        }
79    }
80}
81
82/// Validated configuration of the adaptive-threshold layer.
83#[derive(Debug, Clone, Copy, PartialEq)]
84#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
85pub struct ThresholdedConfig {
86    /// Multiplier on the score stddev used to derive the adaptive
87    /// threshold when [`Self::threshold_mode`] is
88    /// [`ThresholdMode::ZSigma`]. Kept as a top-level field for
89    /// back-compat — callers constructing via struct literal get
90    /// the legacy behaviour without opt-in. Ignored under
91    /// [`ThresholdMode::Quantile`].
92    pub z_factor: f64,
93    /// Selects whether the threshold is driven by the EMA's
94    /// `mean + z·σ` or by a streaming quantile of the score
95    /// distribution. Defaults to [`ThresholdMode::ZSigma`].
96    #[cfg_attr(feature = "serde", serde(default))]
97    pub threshold_mode: ThresholdMode,
98    /// EMA smoothing factor on the score stream. Must be in `(0, 1]`.
99    pub score_decay: f64,
100    /// Samples required before the detector stops emitting
101    /// warming-up verdicts.
102    pub min_observations: u64,
103    /// Absolute floor on the adaptive threshold.
104    pub min_threshold: f64,
105}
106
107impl Default for ThresholdedConfig {
108    fn default() -> Self {
109        Self {
110            z_factor: DEFAULT_Z_FACTOR,
111            threshold_mode: ThresholdMode::default(),
112            score_decay: DEFAULT_SCORE_DECAY,
113            min_observations: DEFAULT_MIN_OBSERVATIONS,
114            min_threshold: DEFAULT_MIN_THRESHOLD,
115        }
116    }
117}
118
119impl ThresholdedConfig {
120    /// Validate the configuration.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`RcfError::InvalidConfig`] when any field is outside
125    /// its accepted range: `z_factor` must be finite and positive,
126    /// `score_decay` finite and in `(0, 1]`, `min_threshold` finite
127    /// and non-negative.
128    pub fn validate(&self) -> RcfResult<()> {
129        match self.threshold_mode {
130            ThresholdMode::ZSigma { z_factor } => {
131                if !z_factor.is_finite() || z_factor <= 0.0 {
132                    return Err(RcfError::InvalidConfig(
133                        format!("z_factor must be finite and > 0, got {z_factor}").into(),
134                    ));
135                }
136            }
137            ThresholdMode::Quantile { p } => {
138                if !p.is_finite() || !(0.0..1.0).contains(&p) || p <= 0.0 {
139                    return Err(RcfError::InvalidConfig(
140                        format!("Quantile p must be in (0.0, 1.0), got {p}").into(),
141                    ));
142                }
143            }
144        }
145        // The legacy `z_factor` field is still validated so callers
146        // building via struct literal (without touching
147        // `threshold_mode`) still get the strictness they used to.
148        if !self.z_factor.is_finite() || self.z_factor <= 0.0 {
149            return Err(RcfError::InvalidConfig(
150                format!("z_factor must be finite and > 0, got {}", self.z_factor).into(),
151            ));
152        }
153        if !self.score_decay.is_finite() || self.score_decay <= 0.0 || self.score_decay > 1.0 {
154            return Err(RcfError::InvalidConfig(
155                format!(
156                    "score_decay must be in (0.0, 1.0], got {}",
157                    self.score_decay
158                )
159                .into(),
160            ));
161        }
162        if !self.min_threshold.is_finite() || self.min_threshold < 0.0 {
163            return Err(RcfError::InvalidConfig(
164                format!(
165                    "min_threshold must be finite and >= 0, got {}",
166                    self.min_threshold
167                )
168                .into(),
169            ));
170        }
171        Ok(())
172    }
173}
174
175/// Fluent builder for [`ThresholdedForest`].
176///
177/// Wraps a [`ForestBuilder`] so callers configure the underlying
178/// forest and the threshold layer in one chain:
179///
180/// ```
181/// use anomstream_core::ThresholdedForestBuilder;
182///
183/// let detector = ThresholdedForestBuilder::<4>::new()
184///     .num_trees(50)
185///     .sample_size(64)
186///     .z_factor(3.0)
187///     .seed(42)
188///     .build()
189///     .unwrap();
190/// assert_eq!(detector.forest().num_trees(), 50);
191/// ```
192#[derive(Debug, Clone)]
193pub struct ThresholdedForestBuilder<const D: usize> {
194    /// Forest layer builder (forwarded to through explicit methods).
195    forest: ForestBuilder<D>,
196    /// Threshold layer configuration under construction.
197    thresholded: ThresholdedConfig,
198}
199
200impl<const D: usize> Default for ThresholdedForestBuilder<D> {
201    fn default() -> Self {
202        Self::new()
203    }
204}
205
206impl<const D: usize> ThresholdedForestBuilder<D> {
207    /// Start a new builder with AWS-conformant forest defaults and
208    /// the threshold defaults described in [`ThresholdedConfig`].
209    #[must_use]
210    pub fn new() -> Self {
211        Self {
212            forest: ForestBuilder::<D>::new(),
213            thresholded: ThresholdedConfig::default(),
214        }
215    }
216
217    /// Override the number of trees in the underlying forest.
218    #[must_use]
219    pub fn num_trees(mut self, n: usize) -> Self {
220        self.forest = self.forest.num_trees(n);
221        self
222    }
223
224    /// Override the per-tree reservoir size of the underlying forest.
225    #[must_use]
226    pub fn sample_size(mut self, s: usize) -> Self {
227        self.forest = self.forest.sample_size(s);
228        self
229    }
230
231    /// Override the reservoir time-decay factor of the underlying
232    /// forest (biases the reservoir toward recent points).
233    #[must_use]
234    pub fn time_decay(mut self, d: f64) -> Self {
235        self.forest = self.forest.time_decay(d);
236        self
237    }
238
239    /// Pin the forest RNG seed for reproducible runs.
240    #[must_use]
241    pub fn seed(mut self, seed: u64) -> Self {
242        self.forest = self.forest.seed(seed);
243        self
244    }
245
246    /// Request a dedicated rayon thread pool for the forest's parallel
247    /// paths. Requires the `parallel` cargo feature. See
248    /// [`ForestBuilder::num_threads`].
249    #[must_use]
250    pub fn num_threads(mut self, n: usize) -> Self {
251        self.forest = self.forest.num_threads(n);
252        self
253    }
254
255    /// Override the warmup admission fraction forwarded to each
256    /// per-tree reservoir. Forwarded to
257    /// [`ForestBuilder::initial_accept_fraction`].
258    #[must_use]
259    pub fn initial_accept_fraction(mut self, f: f64) -> Self {
260        self.forest = self.forest.initial_accept_fraction(f);
261        self
262    }
263
264    /// Set per-dimension multiplicative feature scales on the
265    /// underlying forest. Forwarded to
266    /// [`ForestBuilder::feature_scales`]. See that method for
267    /// semantics and validation rules.
268    #[must_use]
269    pub fn feature_scales(mut self, scales: [f64; D]) -> Self {
270        self.forest = self.forest.feature_scales(scales);
271        self
272    }
273
274    /// Override the threshold's z-factor. Implies
275    /// [`ThresholdMode::ZSigma`] — mutually exclusive with
276    /// [`Self::quantile_threshold`]; the last call wins.
277    #[must_use]
278    pub fn z_factor(mut self, z: f64) -> Self {
279        self.thresholded.z_factor = z;
280        self.thresholded.threshold_mode = ThresholdMode::ZSigma { z_factor: z };
281        self
282    }
283
284    /// Drive the threshold from a streaming quantile of the score
285    /// distribution instead of the Gaussian `μ + z·σ`. `p` is the
286    /// target tail quantile — `0.99` budgets ~1 % alert rate in
287    /// steady state, `0.999` ~0.1 %. Mutually exclusive with
288    /// [`Self::z_factor`]; the last call wins.
289    #[must_use]
290    pub fn quantile_threshold(mut self, p: f64) -> Self {
291        self.thresholded.threshold_mode = ThresholdMode::Quantile { p };
292        self
293    }
294
295    /// Override the EMA smoothing factor on the anomaly-score stream.
296    #[must_use]
297    pub fn score_decay(mut self, d: f64) -> Self {
298        self.thresholded.score_decay = d;
299        self
300    }
301
302    /// Override the number of samples the detector requires before
303    /// emitting a non-warmup verdict.
304    #[must_use]
305    pub fn min_observations(mut self, n: u64) -> Self {
306        self.thresholded.min_observations = n;
307        self
308    }
309
310    /// Override the absolute floor on the adaptive threshold.
311    #[must_use]
312    pub fn min_threshold(mut self, t: f64) -> Self {
313        self.thresholded.min_threshold = t;
314        self
315    }
316
317    /// Read-only access to the forest-layer builder.
318    #[must_use]
319    pub fn forest_builder(&self) -> &ForestBuilder<D> {
320        &self.forest
321    }
322
323    /// Read-only access to the threshold-layer configuration.
324    #[must_use]
325    pub fn thresholded_config(&self) -> &ThresholdedConfig {
326        &self.thresholded
327    }
328
329    /// Validate every parameter and build the detector.
330    ///
331    /// # Errors
332    ///
333    /// Propagates [`ForestBuilder::build`] errors and
334    /// [`ThresholdedConfig::validate`] errors.
335    #[must_use = "detector output should be checked — dropping it silently usually indicates a logic bug"]
336    pub fn build(self) -> RcfResult<ThresholdedForest<D>> {
337        self.thresholded.validate()?;
338        let forest = self.forest.build()?;
339        ThresholdedForest::<D>::from_parts(forest, self.thresholded)
340    }
341}
342
343#[cfg(test)]
344#[allow(clippy::float_cmp)] // Defaults compared bit-exactly against the module constants.
345mod tests {
346    use super::*;
347
348    #[test]
349    fn default_config_validates() {
350        ThresholdedConfig::default().validate().unwrap();
351    }
352
353    #[test]
354    fn default_config_fields_match_constants() {
355        let c = ThresholdedConfig::default();
356        assert_eq!(c.z_factor, DEFAULT_Z_FACTOR);
357        assert_eq!(c.score_decay, DEFAULT_SCORE_DECAY);
358        assert_eq!(c.min_observations, DEFAULT_MIN_OBSERVATIONS);
359        assert_eq!(c.min_threshold, DEFAULT_MIN_THRESHOLD);
360    }
361
362    fn cfg(z: f64, decay: f64, min_obs: u64, min_thr: f64) -> ThresholdedConfig {
363        ThresholdedConfig {
364            z_factor: z,
365            threshold_mode: ThresholdMode::ZSigma { z_factor: z },
366            score_decay: decay,
367            min_observations: min_obs,
368            min_threshold: min_thr,
369        }
370    }
371
372    #[test]
373    fn validate_rejects_non_finite_z_factor() {
374        assert!(
375            cfg(f64::NAN, DEFAULT_SCORE_DECAY, 1, 0.0)
376                .validate()
377                .is_err()
378        );
379        assert!(
380            cfg(f64::INFINITY, DEFAULT_SCORE_DECAY, 1, 0.0)
381                .validate()
382                .is_err()
383        );
384    }
385
386    #[test]
387    fn validate_rejects_non_positive_z_factor() {
388        assert!(cfg(0.0, DEFAULT_SCORE_DECAY, 1, 0.0).validate().is_err());
389        assert!(cfg(-1.0, DEFAULT_SCORE_DECAY, 1, 0.0).validate().is_err());
390    }
391
392    #[test]
393    fn validate_rejects_score_decay_outside_range() {
394        assert!(cfg(DEFAULT_Z_FACTOR, 0.0, 1, 0.0).validate().is_err());
395        assert!(cfg(DEFAULT_Z_FACTOR, 1.5, 1, 0.0).validate().is_err());
396        assert!(cfg(DEFAULT_Z_FACTOR, f64::NAN, 1, 0.0).validate().is_err());
397    }
398
399    #[test]
400    fn validate_rejects_negative_min_threshold() {
401        assert!(
402            cfg(DEFAULT_Z_FACTOR, DEFAULT_SCORE_DECAY, 1, -0.001)
403                .validate()
404                .is_err()
405        );
406    }
407
408    #[test]
409    fn builder_defaults_pass_validation() {
410        let b = ThresholdedForestBuilder::<4>::new();
411        b.thresholded_config().validate().unwrap();
412        b.forest_builder().config().validate().unwrap();
413    }
414
415    #[test]
416    fn builder_overrides_apply_to_both_layers() {
417        let b = ThresholdedForestBuilder::<4>::new()
418            .num_trees(150)
419            .sample_size(128)
420            .z_factor(2.5)
421            .score_decay(0.05)
422            .min_observations(10)
423            .min_threshold(0.5)
424            .initial_accept_fraction(0.125)
425            .seed(7);
426        assert_eq!(b.forest_builder().config().num_trees, 150);
427        assert_eq!(b.forest_builder().config().sample_size, 128);
428        assert_eq!(b.forest_builder().config().seed, Some(7));
429        assert!((b.forest_builder().config().initial_accept_fraction - 0.125).abs() < f64::EPSILON);
430        assert_eq!(b.thresholded_config().z_factor, 2.5);
431        assert_eq!(b.thresholded_config().score_decay, 0.05);
432        assert_eq!(b.thresholded_config().min_observations, 10);
433        assert_eq!(b.thresholded_config().min_threshold, 0.5);
434    }
435
436    #[test]
437    fn builder_build_validates_forest_layer() {
438        let err = ThresholdedForestBuilder::<4>::new()
439            .num_trees(10)
440            .build()
441            .unwrap_err();
442        assert!(matches!(err, RcfError::InvalidConfig(_)));
443    }
444
445    #[test]
446    fn builder_build_validates_threshold_layer() {
447        let err = ThresholdedForestBuilder::<4>::new()
448            .z_factor(-1.0)
449            .build()
450            .unwrap_err();
451        assert!(matches!(err, RcfError::InvalidConfig(_)));
452    }
453}