Skip to main content

anomstream_core/
metrics.rs

1//! Metrics sink abstraction for observability wiring.
2//!
3//! [`MetricsSink`] is a narrow trait exposed by the crate so
4//! long-running agents can drain counters / gauges / histograms
5//! into `Prometheus`, `StatsD`, `OpenTelemetry`, or any other aggregator
6//! without the anomstream-core internals pulling a concrete metrics crate.
7//! Three event types cover everything the forest / thresholded /
8//! pool / meta-drift layers emit:
9//!
10//! - **counter** — monotonically increasing event tallies
11//!   (`rcf_updates_total`, `rcf_anomalies_fired_total`, …).
12//! - **gauge** — point-in-time values (`rcf_tenants_resident`,
13//!   `rcf_threshold_current`, …).
14//! - **histogram observation** — an `f64` sample the sink should
15//!   bucket on its own (`rcf_score`, `rcf_grade`, …).
16//!
17//! Implementations must be `Send + Sync` so the sink can be shared
18//! across threads — every detector type holds an `Arc<dyn
19//! MetricsSink>`. [`NoopSink`] is the default zero-cost fallback
20//! (every call is a `#[inline]` no-op).
21//!
22//! # Wiring
23//!
24//! Every detector exposes a consuming builder method
25//! `.with_metrics_sink(Arc<dyn MetricsSink>)` that installs a sink.
26//! For [`crate::TenantForestPool`] the sink applies to the pool
27//! itself (tenant evictions, resident count); per-tenant detectors
28//! inherit nothing automatically — callers who want per-tenant
29//! observability should install a sink on each detector through the
30//! pool's factory closure.
31
32#[cfg(feature = "std")]
33use std::sync::{Arc, LazyLock};
34
35/// Narrow observability interface exposed by anomstream-core detectors.
36pub trait MetricsSink: Send + Sync + core::fmt::Debug {
37    /// Increment a named monotonic counter by `value`.
38    fn inc_counter(&self, name: &str, value: u64);
39    /// Set a named gauge to `value`.
40    fn set_gauge(&self, name: &str, value: f64);
41    /// Record a histogram observation of `value` under `name`.
42    fn observe_histogram(&self, name: &str, value: f64);
43}
44
45/// Zero-cost [`MetricsSink`] implementation — every call is an
46/// inlined no-op. Default sink every detector ships with.
47#[derive(Debug, Clone, Copy, Default)]
48pub struct NoopSink;
49
50impl MetricsSink for NoopSink {
51    #[inline]
52    fn inc_counter(&self, _name: &str, _value: u64) {}
53    #[inline]
54    fn set_gauge(&self, _name: &str, _value: f64) {}
55    #[inline]
56    fn observe_histogram(&self, _name: &str, _value: f64) {}
57}
58
59/// Process-wide shared [`NoopSink`] Arc. Built once on first
60/// access and reused by every [`default_sink`] call so detector
61/// / sampler / channel constructors do not heap-allocate a fresh
62/// `Arc<NoopSink>` each time — the refcount bump is a single
63/// relaxed atomic RMW.
64#[cfg(feature = "std")]
65static SHARED_NOOP_SINK: LazyLock<Arc<dyn MetricsSink>> = LazyLock::new(|| Arc::new(NoopSink));
66
67/// Return the process-wide shared [`NoopSink`] handle. Clone of
68/// a lazily-initialised static — no heap allocation on the hot
69/// constructor path. The returned `Arc` coerces to
70/// `Arc<dyn MetricsSink>` so call sites can store it behind the
71/// same dynamic-dispatch sink handle every detector uses.
72#[cfg(feature = "std")]
73#[must_use]
74pub fn default_sink() -> Arc<dyn MetricsSink> {
75    Arc::clone(&SHARED_NOOP_SINK)
76}
77
78/// In-memory testing sink that records every observation into
79/// caller-inspectable maps. Useful in tests and benches that want
80/// to assert on what the forest / detector emitted during a
81/// scenario.
82#[cfg(feature = "std")]
83#[derive(Debug, Default)]
84pub struct TestSink {
85    /// Thread-safe recorded events.
86    inner: std::sync::Mutex<TestSinkInner>,
87}
88
89/// Recorded state of a [`TestSink`] at inspection time.
90#[cfg(feature = "std")]
91#[derive(Debug, Default, Clone)]
92pub struct TestSinkInner {
93    /// Cumulative counter totals keyed by name.
94    pub counters: std::collections::HashMap<String, u64>,
95    /// Latest gauge value keyed by name.
96    pub gauges: std::collections::HashMap<String, f64>,
97    /// Per-name list of histogram observations (ordered by arrival).
98    pub histograms: std::collections::HashMap<String, Vec<f64>>,
99}
100
101#[cfg(feature = "std")]
102impl TestSink {
103    /// Build a fresh empty sink.
104    #[must_use]
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    /// Snapshot the current recorded state.
110    ///
111    /// # Panics
112    ///
113    /// Panics if another thread panicked while holding the internal
114    /// lock — the mutex is poisoned in that case and recovery is a
115    /// test-only concern.
116    #[must_use]
117    pub fn snapshot(&self) -> TestSinkInner {
118        self.lock_inner().clone()
119    }
120
121    /// Counter total for `name`, `0` when unseen.
122    ///
123    /// # Panics
124    ///
125    /// Panics if the internal lock is poisoned.
126    #[must_use]
127    pub fn counter(&self, name: &str) -> u64 {
128        *self.lock_inner().counters.get(name).unwrap_or(&0)
129    }
130
131    /// Latest gauge value for `name`, `None` when unseen.
132    ///
133    /// # Panics
134    ///
135    /// Panics if the internal lock is poisoned.
136    #[must_use]
137    pub fn gauge(&self, name: &str) -> Option<f64> {
138        self.lock_inner().gauges.get(name).copied()
139    }
140
141    /// Histogram observations for `name`, cloned.
142    ///
143    /// # Panics
144    ///
145    /// Panics if the internal lock is poisoned.
146    #[must_use]
147    pub fn histogram(&self, name: &str) -> Vec<f64> {
148        self.lock_inner()
149            .histograms
150            .get(name)
151            .cloned()
152            .unwrap_or_default()
153    }
154
155    /// Shared helper — acquires the inner guard and surfaces poison
156    /// with an explicit message instead of an opaque `unwrap`.
157    /// Poison can only happen if another thread panicked while the
158    /// lock was held; callers already document this in `# Panics`.
159    fn lock_inner(&self) -> std::sync::MutexGuard<'_, TestSinkInner> {
160        self.inner
161            .lock()
162            .expect("TestSink mutex poisoned — another thread panicked holding it")
163    }
164}
165
166#[cfg(feature = "std")]
167impl MetricsSink for TestSink {
168    fn inc_counter(&self, name: &str, value: u64) {
169        let mut guard = self.lock_inner();
170        *guard.counters.entry(name.to_string()).or_insert(0) = guard
171            .counters
172            .get(name)
173            .copied()
174            .unwrap_or(0)
175            .saturating_add(value);
176    }
177    fn set_gauge(&self, name: &str, value: f64) {
178        let mut guard = self.lock_inner();
179        guard.gauges.insert(name.to_string(), value);
180    }
181    fn observe_histogram(&self, name: &str, value: f64) {
182        let mut guard = self.lock_inner();
183        guard
184            .histograms
185            .entry(name.to_string())
186            .or_default()
187            .push(value);
188    }
189}
190
191/// Canonical metric names emitted by the crate. Exposed as
192/// constants so downstream dashboards can pin label expectations
193/// without stringly-typing.
194///
195/// # `SemVer` guarantee
196///
197/// Every identifier in this module (`UPDATES_TOTAL`,
198/// `PROCESS_TOTAL`, …) carries a **`SemVer`-stable string value**.
199/// The value assigned to a const never changes across patch /
200/// minor releases — SOC dashboards, Prometheus recording rules,
201/// and Grafana alert expressions that reference these identifiers
202/// or their string values keep working through any non-major
203/// bump. New metrics can be added freely; existing ones are
204/// renamed only with a major release and a documented migration.
205///
206/// # Why `&str` over an enum
207///
208/// [`MetricsSink::inc_counter`] + friends take `&str` rather than
209/// an enum `MetricName` so downstream sinks can emit crate-
210/// external metric names (service-level rollups, tenant-scoped
211/// names, dynamic labels) through the same sink pointer. An
212/// enum would force every integrator to fork the crate or
213/// maintain a parallel dynamic fallback. The const-string
214/// pattern keeps `SemVer` pressure on this module's identifiers
215/// (enforced by the stability note above) without paying the
216/// surface cost.
217pub mod names {
218    /// Counter: every [`crate::RandomCutForest::update`] call.
219    pub const UPDATES_TOTAL: &str = "rcf_updates_total";
220    /// Counter: every [`crate::ThresholdedForest::process`] call.
221    pub const PROCESS_TOTAL: &str = "rcf_process_total";
222    /// Counter: every [`crate::ThresholdedForest::process`] call
223    /// whose verdict was flagged `is_anomaly`.
224    pub const ANOMALIES_FIRED_TOTAL: &str = "rcf_anomalies_fired_total";
225    /// Counter: every [`crate::MetaDriftDetector::observe`] call
226    /// that returned `Some(DriftKind::*)` — aggregate of up + down.
227    pub const DRIFT_FIRES_TOTAL: &str = "rcf_drift_fires_total";
228    /// Counter: CUSUM upward drift fires (`DriftKind::Upward`).
229    pub const DRIFT_UP_TOTAL: &str = "rcf_drift_up_total";
230    /// Counter: CUSUM downward drift fires (`DriftKind::Downward`).
231    pub const DRIFT_DOWN_TOTAL: &str = "rcf_drift_down_total";
232    /// Counter: every [`crate::RandomCutForest::delete`] call that
233    /// actually removed a point.
234    pub const DELETES_TOTAL: &str = "rcf_deletes_total";
235    /// Counter: every [`crate::RandomCutForest::attribution`] call
236    /// that returned successfully.
237    pub const ATTRIBUTION_TOTAL: &str = "rcf_attribution_total";
238    /// Counter: inputs rejected because they contained a non-finite
239    /// component (NaN / ±inf). Bumped once per rejected public call
240    /// — upstream data-quality signal for SOC dashboards.
241    pub const REJECTED_NAN_TOTAL: &str = "rcf_rejected_nan_total";
242    /// Counter: [`crate::RandomCutForest::score_early_term`] calls
243    /// that short-circuited (walked fewer than `num_trees`). Pair
244    /// with the call site's total to derive the early-stop ratio.
245    pub const EARLY_TERM_STOPPED_TOTAL: &str = "rcf_early_term_stopped_total";
246    /// Counter: every [`crate::TenantForestPool`] eviction (LRU + TTL
247    /// paths combined). Pair with [`TENANT_IDLE_EVICTIONS_TOTAL`] to
248    /// derive the pressure-driven share.
249    pub const TENANT_EVICTIONS_TOTAL: &str = "rcf_tenant_evictions_total";
250    /// Counter: idle / TTL-driven evictions from
251    /// [`crate::TenantForestPool::evict_idle`]. Subset of
252    /// [`TENANT_EVICTIONS_TOTAL`].
253    pub const TENANT_IDLE_EVICTIONS_TOTAL: &str = "rcf_tenant_idle_evictions_total";
254    /// Counter: pool-factory invocations — a fresh tenant entered
255    /// the resident set. Diverges from `TENANT_EVICTIONS_TOTAL` so
256    /// churn (create − evict) is observable.
257    pub const TENANT_CREATED_TOTAL: &str = "rcf_tenant_created_total";
258    /// Counter: bootstrap/replay points successfully ingested.
259    pub const BOOTSTRAP_POINTS_TOTAL: &str = "rcf_bootstrap_points_total";
260    /// Counter: bootstrap/replay points skipped for being non-finite.
261    pub const BOOTSTRAP_SKIPPED_TOTAL: &str = "rcf_bootstrap_skipped_total";
262    /// Counter: every `FeatureDriftDetector::observe` call.
263    pub const FEATURE_DRIFT_OBSERVED_TOTAL: &str = "rcf_feature_drift_observed_total";
264    /// Counter: every `AlertClusterer::observe` call — total alerts
265    /// ingested, pre-dedup.
266    pub const ALERTS_OBSERVED_TOTAL: &str = "rcf_alerts_observed_total";
267    /// Counter: alerts that opened a brand-new cluster (no existing
268    /// cluster passed the similarity threshold).
269    pub const ALERT_CLUSTERS_NEW_TOTAL: &str = "rcf_alert_clusters_new_total";
270    /// Counter: alerts merged into an existing cluster — the dedup
271    /// win. Pair with `ALERTS_OBSERVED_TOTAL` to derive the dedup
272    /// ratio.
273    pub const ALERT_CLUSTERS_JOINED_TOTAL: &str = "rcf_alert_clusters_joined_total";
274    /// Counter: clusters pruned because their most recent alert fell
275    /// outside the sliding window.
276    pub const ALERT_CLUSTERS_PRUNED_TOTAL: &str = "rcf_alert_clusters_pruned_total";
277
278    /// Gauge: number of trees held by a forest.
279    pub const FOREST_TREES: &str = "rcf_forest_trees";
280    /// Gauge: current adaptive threshold
281    /// ([`crate::ThresholdedForest::current_threshold`]).
282    pub const THRESHOLD_CURRENT: &str = "rcf_threshold_current";
283    /// Gauge: EMA mean of the score stream on a `ThresholdedForest`.
284    pub const EMA_MEAN: &str = "rcf_ema_mean";
285    /// Gauge: EMA stddev of the score stream on a `ThresholdedForest`.
286    pub const EMA_STDDEV: &str = "rcf_ema_stddev";
287    /// Gauge: observations folded into the TRCF score-stream EMA.
288    /// Combined with the configured `min_observations`, exposes the
289    /// warmup progress of a fresh detector.
290    pub const OBSERVATIONS_SEEN: &str = "rcf_observations_seen";
291    /// Gauge: number of tenants resident in a
292    /// [`crate::TenantForestPool`] after each public op.
293    pub const TENANTS_RESIDENT: &str = "rcf_tenants_resident";
294    /// Gauge: configured tenant capacity of a
295    /// [`crate::TenantForestPool`]. Static after construction but
296    /// dashboards benefit from being able to chart
297    /// `TENANTS_RESIDENT / TENANT_CAPACITY` pressure ratios.
298    pub const TENANT_CAPACITY: &str = "rcf_tenant_capacity";
299    /// Gauge: active clusters held by an `AlertClusterer`.
300    pub const ALERT_CLUSTERS_ACTIVE: &str = "rcf_alert_clusters_active";
301    /// Gauge: maximum per-dim PSI of a
302    /// `FeatureDriftDetector`. Set on every `psi()` call.
303    pub const FEATURE_DRIFT_MAX_PSI: &str = "rcf_feature_drift_max_psi";
304
305    /// Histogram: raw anomaly score per scored point.
306    pub const SCORE_OBSERVATION: &str = "rcf_score";
307    /// Histogram: graded verdict (`[0, 1]`) per processed point.
308    pub const GRADE_OBSERVATION: &str = "rcf_grade";
309    /// Histogram: upward CUSUM accumulator after each observation.
310    pub const DRIFT_S_HIGH: &str = "rcf_drift_s_high";
311    /// Histogram: downward CUSUM accumulator after each observation.
312    pub const DRIFT_S_LOW: &str = "rcf_drift_s_low";
313    /// Histogram: trees actually walked per
314    /// [`crate::RandomCutForest::score_early_term`] call — use with
315    /// [`FOREST_TREES`] to compute the latency savings distribution.
316    pub const EARLY_TERM_TREES: &str = "rcf_early_term_trees";
317
318    // --- hot_path ------------------------------------------------
319
320    /// Counter: `anomstream_hotpath::UpdateSampler` `accept_*` calls that
321    /// admitted the offer.
322    pub const HOT_PATH_SAMPLER_ACCEPTED_TOTAL: &str = "rcf_hot_path_sampler_accepted_total";
323    /// Counter: `anomstream_hotpath::UpdateSampler` `accept_*` calls that
324    /// rejected the offer (stride / per-flow-hash residue mismatch).
325    pub const HOT_PATH_SAMPLER_REJECTED_TOTAL: &str = "rcf_hot_path_sampler_rejected_total";
326    /// Counter: points successfully enqueued through a
327    /// `anomstream_hotpath::UpdateProducer::try_enqueue` call.
328    pub const HOT_PATH_QUEUE_ENQUEUED_TOTAL: &str = "rcf_hot_path_queue_enqueued_total";
329    /// Counter: points dropped because the hot-path MPSC queue was
330    /// full. Non-zero indicates classifier > updater throughput.
331    pub const HOT_PATH_QUEUE_DROPPED_TOTAL: &str = "rcf_hot_path_queue_dropped_total";
332    /// Counter: `anomstream_hotpath::PrefixRateCap::check_and_record` calls that
333    /// admitted the offer.
334    pub const HOT_PATH_PREFIX_ADMITTED_TOTAL: &str = "rcf_hot_path_prefix_admitted_total";
335    /// Counter: `anomstream_hotpath::PrefixRateCap::check_and_record` calls that
336    /// capped the offer (bucket at limit).
337    pub const HOT_PATH_PREFIX_CAPPED_TOTAL: &str = "rcf_hot_path_prefix_capped_total";
338
339    // --- drift_aware ---------------------------------------------
340
341    /// Counter: [`crate::DriftAwareForest`] shadow → primary swaps.
342    pub const DRIFT_AWARE_SWAPS_TOTAL: &str = "rcf_drift_aware_swaps_total";
343    /// Counter: [`crate::DriftAwareForest::on_drift`] calls that
344    /// actually spawned a shadow (not gated out by
345    /// `min_primary_age` or in-flight shadow).
346    pub const DRIFT_AWARE_ON_DRIFT_TOTAL: &str = "rcf_drift_aware_on_drift_total";
347    /// Gauge: `1.0` while a shadow is warming, `0.0` otherwise.
348    pub const DRIFT_AWARE_SHADOW_ACTIVE: &str = "rcf_drift_aware_shadow_active";
349
350    // --- adwin ---------------------------------------------------
351
352    /// Counter: finite values folded into an
353    /// [`crate::AdwinDetector::update`] window.
354    pub const ADWIN_OBSERVED_TOTAL: &str = "rcf_adwin_observed_total";
355    /// Counter: [`crate::AdwinDetector::update`] calls that
356    /// detected drift and shrank the window.
357    pub const ADWIN_DRIFT_FIRES_TOTAL: &str = "rcf_adwin_drift_fires_total";
358
359    // --- lsh_cluster ---------------------------------------------
360
361    /// Counter: every `anomstream_triage::LshAlertClusterer::observe` call.
362    pub const LSH_ALERTS_OBSERVED_TOTAL: &str = "rcf_lsh_alerts_observed_total";
363    /// Counter: LSH alerts that opened a brand-new bucket.
364    pub const LSH_CLUSTERS_NEW_TOTAL: &str = "rcf_lsh_clusters_new_total";
365    /// Counter: LSH alerts merged into an existing bucket.
366    pub const LSH_CLUSTERS_JOINED_TOTAL: &str = "rcf_lsh_clusters_joined_total";
367    /// Gauge: distinct active LSH cluster hashes.
368    pub const LSH_CLUSTERS_ACTIVE: &str = "rcf_lsh_clusters_active";
369
370    // --- feedback ------------------------------------------------
371
372    /// Counter: every `anomstream_triage::FeedbackStore::label` call that
373    /// accepted the point.
374    pub const FEEDBACK_LABELS_OBSERVED_TOTAL: &str = "rcf_feedback_labels_observed_total";
375    /// Counter: feedback labels of kind
376    /// `anomstream_triage::FeedbackLabel::Benign`.
377    pub const FEEDBACK_LABELS_BENIGN_TOTAL: &str = "rcf_feedback_labels_benign_total";
378    /// Counter: feedback labels of kind
379    /// `anomstream_triage::FeedbackLabel::Confirmed`.
380    pub const FEEDBACK_LABELS_CONFIRMED_TOTAL: &str = "rcf_feedback_labels_confirmed_total";
381
382    // --- univariate_spot -----------------------------------------
383
384    /// Counter: every finite value folded via
385    /// [`crate::PotDetector::record`].
386    pub const SPOT_OBSERVATIONS_TOTAL: &str = "rcf_spot_observations_total";
387    /// Counter: peaks accumulated above the SPOT/DSPOT threshold `u`.
388    pub const SPOT_PEAKS_TOTAL: &str = "rcf_spot_peaks_total";
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394
395    #[test]
396    fn noop_sink_is_noop() {
397        let s = NoopSink;
398        s.inc_counter("x", 1);
399        s.set_gauge("y", 2.0);
400        s.observe_histogram("z", 3.0);
401    }
402
403    #[test]
404    fn default_sink_builds_noop_arc() {
405        let s = default_sink();
406        s.inc_counter("x", 1);
407    }
408
409    #[test]
410    fn test_sink_records_counter_gauge_histogram() {
411        let s = TestSink::new();
412        s.inc_counter("a", 3);
413        s.inc_counter("a", 4);
414        s.set_gauge("b", 1.25);
415        s.set_gauge("b", 2.5);
416        s.observe_histogram("c", 0.1);
417        s.observe_histogram("c", 0.2);
418        assert_eq!(s.counter("a"), 7);
419        assert_eq!(s.gauge("b"), Some(2.5));
420        assert_eq!(s.histogram("c"), vec![0.1, 0.2]);
421    }
422
423    #[test]
424    fn test_sink_unseen_metrics_default() {
425        let s = TestSink::new();
426        assert_eq!(s.counter("nope"), 0);
427        assert!(s.gauge("nope").is_none());
428        assert!(s.histogram("nope").is_empty());
429    }
430
431    #[test]
432    fn default_sink_returns_shared_arc() {
433        let a = default_sink();
434        let b = default_sink();
435        // Both handles must reference the same backing allocation
436        // — the shared-static optimisation depends on it.
437        assert!(
438            Arc::ptr_eq(&a, &b),
439            "default_sink() should clone a process-wide shared Arc"
440        );
441    }
442
443    #[test]
444    fn default_sink_hits_strong_count_greater_than_one() {
445        let pre = Arc::strong_count(&default_sink());
446        let _pins: Vec<Arc<dyn MetricsSink>> = (0..16).map(|_| default_sink()).collect();
447        let post = Arc::strong_count(&default_sink());
448        assert!(post >= pre + 16);
449    }
450}