anomstream_core/metrics.rs
1//! Metrics sink abstraction for observability wiring.
2//!
3//! [`MetricsSink`] is a narrow trait exposed by the crate so
4//! long-running agents can drain counters / gauges / histograms
5//! into `Prometheus`, `StatsD`, `OpenTelemetry`, or any other aggregator
6//! without the anomstream-core internals pulling a concrete metrics crate.
7//! Three event types cover everything the forest / thresholded /
8//! pool / meta-drift layers emit:
9//!
10//! - **counter** — monotonically increasing event tallies
11//! (`rcf_updates_total`, `rcf_anomalies_fired_total`, …).
12//! - **gauge** — point-in-time values (`rcf_tenants_resident`,
13//! `rcf_threshold_current`, …).
14//! - **histogram observation** — an `f64` sample the sink should
15//! bucket on its own (`rcf_score`, `rcf_grade`, …).
16//!
17//! Implementations must be `Send + Sync` so the sink can be shared
18//! across threads — every detector type holds an `Arc<dyn
19//! MetricsSink>`. [`NoopSink`] is the default zero-cost fallback
20//! (every call is a `#[inline]` no-op).
21//!
22//! # Wiring
23//!
24//! Every detector exposes a consuming builder method
25//! `.with_metrics_sink(Arc<dyn MetricsSink>)` that installs a sink.
26//! For [`crate::TenantForestPool`] the sink applies to the pool
27//! itself (tenant evictions, resident count); per-tenant detectors
28//! inherit nothing automatically — callers who want per-tenant
29//! observability should install a sink on each detector through the
30//! pool's factory closure.
31
32#[cfg(feature = "std")]
33use std::sync::{Arc, LazyLock};
34
35/// Narrow observability interface exposed by anomstream-core detectors.
36pub trait MetricsSink: Send + Sync + core::fmt::Debug {
37 /// Increment a named monotonic counter by `value`.
38 fn inc_counter(&self, name: &str, value: u64);
39 /// Set a named gauge to `value`.
40 fn set_gauge(&self, name: &str, value: f64);
41 /// Record a histogram observation of `value` under `name`.
42 fn observe_histogram(&self, name: &str, value: f64);
43}
44
45/// Zero-cost [`MetricsSink`] implementation — every call is an
46/// inlined no-op. Default sink every detector ships with.
47#[derive(Debug, Clone, Copy, Default)]
48pub struct NoopSink;
49
50impl MetricsSink for NoopSink {
51 #[inline]
52 fn inc_counter(&self, _name: &str, _value: u64) {}
53 #[inline]
54 fn set_gauge(&self, _name: &str, _value: f64) {}
55 #[inline]
56 fn observe_histogram(&self, _name: &str, _value: f64) {}
57}
58
59/// Process-wide shared [`NoopSink`] Arc. Built once on first
60/// access and reused by every [`default_sink`] call so detector
61/// / sampler / channel constructors do not heap-allocate a fresh
62/// `Arc<NoopSink>` each time — the refcount bump is a single
63/// relaxed atomic RMW.
64#[cfg(feature = "std")]
65static SHARED_NOOP_SINK: LazyLock<Arc<dyn MetricsSink>> = LazyLock::new(|| Arc::new(NoopSink));
66
67/// Return the process-wide shared [`NoopSink`] handle. Clone of
68/// a lazily-initialised static — no heap allocation on the hot
69/// constructor path. The returned `Arc` coerces to
70/// `Arc<dyn MetricsSink>` so call sites can store it behind the
71/// same dynamic-dispatch sink handle every detector uses.
72#[cfg(feature = "std")]
73#[must_use]
74pub fn default_sink() -> Arc<dyn MetricsSink> {
75 Arc::clone(&SHARED_NOOP_SINK)
76}
77
78/// In-memory testing sink that records every observation into
79/// caller-inspectable maps. Useful in tests and benches that want
80/// to assert on what the forest / detector emitted during a
81/// scenario.
82#[cfg(feature = "std")]
83#[derive(Debug, Default)]
84pub struct TestSink {
85 /// Thread-safe recorded events.
86 inner: std::sync::Mutex<TestSinkInner>,
87}
88
89/// Recorded state of a [`TestSink`] at inspection time.
90#[cfg(feature = "std")]
91#[derive(Debug, Default, Clone)]
92pub struct TestSinkInner {
93 /// Cumulative counter totals keyed by name.
94 pub counters: std::collections::HashMap<String, u64>,
95 /// Latest gauge value keyed by name.
96 pub gauges: std::collections::HashMap<String, f64>,
97 /// Per-name list of histogram observations (ordered by arrival).
98 pub histograms: std::collections::HashMap<String, Vec<f64>>,
99}
100
101#[cfg(feature = "std")]
102impl TestSink {
103 /// Build a fresh empty sink.
104 #[must_use]
105 pub fn new() -> Self {
106 Self::default()
107 }
108
109 /// Snapshot the current recorded state.
110 ///
111 /// # Panics
112 ///
113 /// Panics if another thread panicked while holding the internal
114 /// lock — the mutex is poisoned in that case and recovery is a
115 /// test-only concern.
116 #[must_use]
117 pub fn snapshot(&self) -> TestSinkInner {
118 self.lock_inner().clone()
119 }
120
121 /// Counter total for `name`, `0` when unseen.
122 ///
123 /// # Panics
124 ///
125 /// Panics if the internal lock is poisoned.
126 #[must_use]
127 pub fn counter(&self, name: &str) -> u64 {
128 *self.lock_inner().counters.get(name).unwrap_or(&0)
129 }
130
131 /// Latest gauge value for `name`, `None` when unseen.
132 ///
133 /// # Panics
134 ///
135 /// Panics if the internal lock is poisoned.
136 #[must_use]
137 pub fn gauge(&self, name: &str) -> Option<f64> {
138 self.lock_inner().gauges.get(name).copied()
139 }
140
141 /// Histogram observations for `name`, cloned.
142 ///
143 /// # Panics
144 ///
145 /// Panics if the internal lock is poisoned.
146 #[must_use]
147 pub fn histogram(&self, name: &str) -> Vec<f64> {
148 self.lock_inner()
149 .histograms
150 .get(name)
151 .cloned()
152 .unwrap_or_default()
153 }
154
155 /// Shared helper — acquires the inner guard and surfaces poison
156 /// with an explicit message instead of an opaque `unwrap`.
157 /// Poison can only happen if another thread panicked while the
158 /// lock was held; callers already document this in `# Panics`.
159 fn lock_inner(&self) -> std::sync::MutexGuard<'_, TestSinkInner> {
160 self.inner
161 .lock()
162 .expect("TestSink mutex poisoned — another thread panicked holding it")
163 }
164}
165
166#[cfg(feature = "std")]
167impl MetricsSink for TestSink {
168 fn inc_counter(&self, name: &str, value: u64) {
169 let mut guard = self.lock_inner();
170 *guard.counters.entry(name.to_string()).or_insert(0) = guard
171 .counters
172 .get(name)
173 .copied()
174 .unwrap_or(0)
175 .saturating_add(value);
176 }
177 fn set_gauge(&self, name: &str, value: f64) {
178 let mut guard = self.lock_inner();
179 guard.gauges.insert(name.to_string(), value);
180 }
181 fn observe_histogram(&self, name: &str, value: f64) {
182 let mut guard = self.lock_inner();
183 guard
184 .histograms
185 .entry(name.to_string())
186 .or_default()
187 .push(value);
188 }
189}
190
191/// Canonical metric names emitted by the crate. Exposed as
192/// constants so downstream dashboards can pin label expectations
193/// without stringly-typing.
194///
195/// # `SemVer` guarantee
196///
197/// Every identifier in this module (`UPDATES_TOTAL`,
198/// `PROCESS_TOTAL`, …) carries a **`SemVer`-stable string value**.
199/// The value assigned to a const never changes across patch /
200/// minor releases — SOC dashboards, Prometheus recording rules,
201/// and Grafana alert expressions that reference these identifiers
202/// or their string values keep working through any non-major
203/// bump. New metrics can be added freely; existing ones are
204/// renamed only with a major release and a documented migration.
205///
206/// # Why `&str` over an enum
207///
208/// [`MetricsSink::inc_counter`] + friends take `&str` rather than
209/// an enum `MetricName` so downstream sinks can emit crate-
210/// external metric names (service-level rollups, tenant-scoped
211/// names, dynamic labels) through the same sink pointer. An
212/// enum would force every integrator to fork the crate or
213/// maintain a parallel dynamic fallback. The const-string
214/// pattern keeps `SemVer` pressure on this module's identifiers
215/// (enforced by the stability note above) without paying the
216/// surface cost.
217pub mod names {
218 /// Counter: every [`crate::RandomCutForest::update`] call.
219 pub const UPDATES_TOTAL: &str = "rcf_updates_total";
220 /// Counter: every [`crate::ThresholdedForest::process`] call.
221 pub const PROCESS_TOTAL: &str = "rcf_process_total";
222 /// Counter: every [`crate::ThresholdedForest::process`] call
223 /// whose verdict was flagged `is_anomaly`.
224 pub const ANOMALIES_FIRED_TOTAL: &str = "rcf_anomalies_fired_total";
225 /// Counter: every [`crate::MetaDriftDetector::observe`] call
226 /// that returned `Some(DriftKind::*)` — aggregate of up + down.
227 pub const DRIFT_FIRES_TOTAL: &str = "rcf_drift_fires_total";
228 /// Counter: CUSUM upward drift fires (`DriftKind::Upward`).
229 pub const DRIFT_UP_TOTAL: &str = "rcf_drift_up_total";
230 /// Counter: CUSUM downward drift fires (`DriftKind::Downward`).
231 pub const DRIFT_DOWN_TOTAL: &str = "rcf_drift_down_total";
232 /// Counter: every [`crate::RandomCutForest::delete`] call that
233 /// actually removed a point.
234 pub const DELETES_TOTAL: &str = "rcf_deletes_total";
235 /// Counter: every [`crate::RandomCutForest::attribution`] call
236 /// that returned successfully.
237 pub const ATTRIBUTION_TOTAL: &str = "rcf_attribution_total";
238 /// Counter: inputs rejected because they contained a non-finite
239 /// component (NaN / ±inf). Bumped once per rejected public call
240 /// — upstream data-quality signal for SOC dashboards.
241 pub const REJECTED_NAN_TOTAL: &str = "rcf_rejected_nan_total";
242 /// Counter: [`crate::RandomCutForest::score_early_term`] calls
243 /// that short-circuited (walked fewer than `num_trees`). Pair
244 /// with the call site's total to derive the early-stop ratio.
245 pub const EARLY_TERM_STOPPED_TOTAL: &str = "rcf_early_term_stopped_total";
246 /// Counter: every [`crate::TenantForestPool`] eviction (LRU + TTL
247 /// paths combined). Pair with [`TENANT_IDLE_EVICTIONS_TOTAL`] to
248 /// derive the pressure-driven share.
249 pub const TENANT_EVICTIONS_TOTAL: &str = "rcf_tenant_evictions_total";
250 /// Counter: idle / TTL-driven evictions from
251 /// [`crate::TenantForestPool::evict_idle`]. Subset of
252 /// [`TENANT_EVICTIONS_TOTAL`].
253 pub const TENANT_IDLE_EVICTIONS_TOTAL: &str = "rcf_tenant_idle_evictions_total";
254 /// Counter: pool-factory invocations — a fresh tenant entered
255 /// the resident set. Diverges from `TENANT_EVICTIONS_TOTAL` so
256 /// churn (create − evict) is observable.
257 pub const TENANT_CREATED_TOTAL: &str = "rcf_tenant_created_total";
258 /// Counter: bootstrap/replay points successfully ingested.
259 pub const BOOTSTRAP_POINTS_TOTAL: &str = "rcf_bootstrap_points_total";
260 /// Counter: bootstrap/replay points skipped for being non-finite.
261 pub const BOOTSTRAP_SKIPPED_TOTAL: &str = "rcf_bootstrap_skipped_total";
262 /// Counter: every `FeatureDriftDetector::observe` call.
263 pub const FEATURE_DRIFT_OBSERVED_TOTAL: &str = "rcf_feature_drift_observed_total";
264 /// Counter: every `AlertClusterer::observe` call — total alerts
265 /// ingested, pre-dedup.
266 pub const ALERTS_OBSERVED_TOTAL: &str = "rcf_alerts_observed_total";
267 /// Counter: alerts that opened a brand-new cluster (no existing
268 /// cluster passed the similarity threshold).
269 pub const ALERT_CLUSTERS_NEW_TOTAL: &str = "rcf_alert_clusters_new_total";
270 /// Counter: alerts merged into an existing cluster — the dedup
271 /// win. Pair with `ALERTS_OBSERVED_TOTAL` to derive the dedup
272 /// ratio.
273 pub const ALERT_CLUSTERS_JOINED_TOTAL: &str = "rcf_alert_clusters_joined_total";
274 /// Counter: clusters pruned because their most recent alert fell
275 /// outside the sliding window.
276 pub const ALERT_CLUSTERS_PRUNED_TOTAL: &str = "rcf_alert_clusters_pruned_total";
277
278 /// Gauge: number of trees held by a forest.
279 pub const FOREST_TREES: &str = "rcf_forest_trees";
280 /// Gauge: current adaptive threshold
281 /// ([`crate::ThresholdedForest::current_threshold`]).
282 pub const THRESHOLD_CURRENT: &str = "rcf_threshold_current";
283 /// Gauge: EMA mean of the score stream on a `ThresholdedForest`.
284 pub const EMA_MEAN: &str = "rcf_ema_mean";
285 /// Gauge: EMA stddev of the score stream on a `ThresholdedForest`.
286 pub const EMA_STDDEV: &str = "rcf_ema_stddev";
287 /// Gauge: observations folded into the TRCF score-stream EMA.
288 /// Combined with the configured `min_observations`, exposes the
289 /// warmup progress of a fresh detector.
290 pub const OBSERVATIONS_SEEN: &str = "rcf_observations_seen";
291 /// Gauge: number of tenants resident in a
292 /// [`crate::TenantForestPool`] after each public op.
293 pub const TENANTS_RESIDENT: &str = "rcf_tenants_resident";
294 /// Gauge: configured tenant capacity of a
295 /// [`crate::TenantForestPool`]. Static after construction but
296 /// dashboards benefit from being able to chart
297 /// `TENANTS_RESIDENT / TENANT_CAPACITY` pressure ratios.
298 pub const TENANT_CAPACITY: &str = "rcf_tenant_capacity";
299 /// Gauge: active clusters held by an `AlertClusterer`.
300 pub const ALERT_CLUSTERS_ACTIVE: &str = "rcf_alert_clusters_active";
301 /// Gauge: maximum per-dim PSI of a
302 /// `FeatureDriftDetector`. Set on every `psi()` call.
303 pub const FEATURE_DRIFT_MAX_PSI: &str = "rcf_feature_drift_max_psi";
304
305 /// Histogram: raw anomaly score per scored point.
306 pub const SCORE_OBSERVATION: &str = "rcf_score";
307 /// Histogram: graded verdict (`[0, 1]`) per processed point.
308 pub const GRADE_OBSERVATION: &str = "rcf_grade";
309 /// Histogram: upward CUSUM accumulator after each observation.
310 pub const DRIFT_S_HIGH: &str = "rcf_drift_s_high";
311 /// Histogram: downward CUSUM accumulator after each observation.
312 pub const DRIFT_S_LOW: &str = "rcf_drift_s_low";
313 /// Histogram: trees actually walked per
314 /// [`crate::RandomCutForest::score_early_term`] call — use with
315 /// [`FOREST_TREES`] to compute the latency savings distribution.
316 pub const EARLY_TERM_TREES: &str = "rcf_early_term_trees";
317
318 // --- hot_path ------------------------------------------------
319
320 /// Counter: `anomstream_hotpath::UpdateSampler` `accept_*` calls that
321 /// admitted the offer.
322 pub const HOT_PATH_SAMPLER_ACCEPTED_TOTAL: &str = "rcf_hot_path_sampler_accepted_total";
323 /// Counter: `anomstream_hotpath::UpdateSampler` `accept_*` calls that
324 /// rejected the offer (stride / per-flow-hash residue mismatch).
325 pub const HOT_PATH_SAMPLER_REJECTED_TOTAL: &str = "rcf_hot_path_sampler_rejected_total";
326 /// Counter: points successfully enqueued through a
327 /// `anomstream_hotpath::UpdateProducer::try_enqueue` call.
328 pub const HOT_PATH_QUEUE_ENQUEUED_TOTAL: &str = "rcf_hot_path_queue_enqueued_total";
329 /// Counter: points dropped because the hot-path MPSC queue was
330 /// full. Non-zero indicates classifier > updater throughput.
331 pub const HOT_PATH_QUEUE_DROPPED_TOTAL: &str = "rcf_hot_path_queue_dropped_total";
332 /// Counter: `anomstream_hotpath::PrefixRateCap::check_and_record` calls that
333 /// admitted the offer.
334 pub const HOT_PATH_PREFIX_ADMITTED_TOTAL: &str = "rcf_hot_path_prefix_admitted_total";
335 /// Counter: `anomstream_hotpath::PrefixRateCap::check_and_record` calls that
336 /// capped the offer (bucket at limit).
337 pub const HOT_PATH_PREFIX_CAPPED_TOTAL: &str = "rcf_hot_path_prefix_capped_total";
338
339 // --- drift_aware ---------------------------------------------
340
341 /// Counter: [`crate::DriftAwareForest`] shadow → primary swaps.
342 pub const DRIFT_AWARE_SWAPS_TOTAL: &str = "rcf_drift_aware_swaps_total";
343 /// Counter: [`crate::DriftAwareForest::on_drift`] calls that
344 /// actually spawned a shadow (not gated out by
345 /// `min_primary_age` or in-flight shadow).
346 pub const DRIFT_AWARE_ON_DRIFT_TOTAL: &str = "rcf_drift_aware_on_drift_total";
347 /// Gauge: `1.0` while a shadow is warming, `0.0` otherwise.
348 pub const DRIFT_AWARE_SHADOW_ACTIVE: &str = "rcf_drift_aware_shadow_active";
349
350 // --- adwin ---------------------------------------------------
351
352 /// Counter: finite values folded into an
353 /// [`crate::AdwinDetector::update`] window.
354 pub const ADWIN_OBSERVED_TOTAL: &str = "rcf_adwin_observed_total";
355 /// Counter: [`crate::AdwinDetector::update`] calls that
356 /// detected drift and shrank the window.
357 pub const ADWIN_DRIFT_FIRES_TOTAL: &str = "rcf_adwin_drift_fires_total";
358
359 // --- lsh_cluster ---------------------------------------------
360
361 /// Counter: every `anomstream_triage::LshAlertClusterer::observe` call.
362 pub const LSH_ALERTS_OBSERVED_TOTAL: &str = "rcf_lsh_alerts_observed_total";
363 /// Counter: LSH alerts that opened a brand-new bucket.
364 pub const LSH_CLUSTERS_NEW_TOTAL: &str = "rcf_lsh_clusters_new_total";
365 /// Counter: LSH alerts merged into an existing bucket.
366 pub const LSH_CLUSTERS_JOINED_TOTAL: &str = "rcf_lsh_clusters_joined_total";
367 /// Gauge: distinct active LSH cluster hashes.
368 pub const LSH_CLUSTERS_ACTIVE: &str = "rcf_lsh_clusters_active";
369
370 // --- feedback ------------------------------------------------
371
372 /// Counter: every `anomstream_triage::FeedbackStore::label` call that
373 /// accepted the point.
374 pub const FEEDBACK_LABELS_OBSERVED_TOTAL: &str = "rcf_feedback_labels_observed_total";
375 /// Counter: feedback labels of kind
376 /// `anomstream_triage::FeedbackLabel::Benign`.
377 pub const FEEDBACK_LABELS_BENIGN_TOTAL: &str = "rcf_feedback_labels_benign_total";
378 /// Counter: feedback labels of kind
379 /// `anomstream_triage::FeedbackLabel::Confirmed`.
380 pub const FEEDBACK_LABELS_CONFIRMED_TOTAL: &str = "rcf_feedback_labels_confirmed_total";
381
382 // --- univariate_spot -----------------------------------------
383
384 /// Counter: every finite value folded via
385 /// [`crate::PotDetector::record`].
386 pub const SPOT_OBSERVATIONS_TOTAL: &str = "rcf_spot_observations_total";
387 /// Counter: peaks accumulated above the SPOT/DSPOT threshold `u`.
388 pub const SPOT_PEAKS_TOTAL: &str = "rcf_spot_peaks_total";
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394
395 #[test]
396 fn noop_sink_is_noop() {
397 let s = NoopSink;
398 s.inc_counter("x", 1);
399 s.set_gauge("y", 2.0);
400 s.observe_histogram("z", 3.0);
401 }
402
403 #[test]
404 fn default_sink_builds_noop_arc() {
405 let s = default_sink();
406 s.inc_counter("x", 1);
407 }
408
409 #[test]
410 fn test_sink_records_counter_gauge_histogram() {
411 let s = TestSink::new();
412 s.inc_counter("a", 3);
413 s.inc_counter("a", 4);
414 s.set_gauge("b", 1.25);
415 s.set_gauge("b", 2.5);
416 s.observe_histogram("c", 0.1);
417 s.observe_histogram("c", 0.2);
418 assert_eq!(s.counter("a"), 7);
419 assert_eq!(s.gauge("b"), Some(2.5));
420 assert_eq!(s.histogram("c"), vec![0.1, 0.2]);
421 }
422
423 #[test]
424 fn test_sink_unseen_metrics_default() {
425 let s = TestSink::new();
426 assert_eq!(s.counter("nope"), 0);
427 assert!(s.gauge("nope").is_none());
428 assert!(s.histogram("nope").is_empty());
429 }
430
431 #[test]
432 fn default_sink_returns_shared_arc() {
433 let a = default_sink();
434 let b = default_sink();
435 // Both handles must reference the same backing allocation
436 // — the shared-static optimisation depends on it.
437 assert!(
438 Arc::ptr_eq(&a, &b),
439 "default_sink() should clone a process-wide shared Arc"
440 );
441 }
442
443 #[test]
444 fn default_sink_hits_strong_count_greater_than_one() {
445 let pre = Arc::strong_count(&default_sink());
446 let _pins: Vec<Arc<dyn MetricsSink>> = (0..16).map(|_| default_sink()).collect();
447 let post = Arc::strong_count(&default_sink());
448 assert!(post >= pre + 16);
449 }
450}