Skip to main content

atd_runtime/
metrics.rs

1//! Lock-free atomic metrics counters surfaced via `Server::metrics_snapshot()`.
2//!
3//! SP-concurrency-baseline §5.7 — gives adopters per-server observability
4//! without forcing a Prometheus-grade pipeline. Counters are updated on the
5//! hot path (accept, dispatch, audit) using `Ordering::Relaxed` since per-
6//! counter atomicity is enough — we never read multiple counters as one
7//! consistent snapshot.
8//!
9//! Future SP-observability-v2 will layer latency histograms (p50 / p99) and
10//! a `/metrics` Prometheus endpoint on top of this counter base.
11
12use serde::Serialize;
13use std::collections::BTreeMap;
14use std::sync::Mutex;
15use std::sync::atomic::{AtomicU64, Ordering};
16
17/// Hot-path counters owned by `ServerState`. All fields are atomic so
18/// every transport (UDS via `atd-server`, HTTP via `atd-server-http`) and
19/// every dispatch path can bump them without coordinating.
20#[derive(Default, Debug)]
21pub struct MetricsCounters {
22    /// Total UDS / HTTP connections accepted since startup.
23    pub accepted_connections: AtomicU64,
24    /// Total `Request` frames dispatched (Ping, Hello, ToolList, ToolSchema,
25    /// RunTool — every variant counts).
26    pub dispatched_requests: AtomicU64,
27    /// Per-error-code counter, lazily allocated. Keyed by `Response::Error.code`
28    /// (the same u16s `atd_protocol::ERR_*` constants define). Sparse: most
29    /// codes are never hit on a given server.
30    pub dispatch_errors_by_code: Mutex<BTreeMap<u16, u64>>,
31    /// Total audit events successfully enqueued onto the audit sink.
32    pub audit_events_total: AtomicU64,
33    /// Total audit events dropped because the audit sink's queue was full
34    /// (SP-concurrency-baseline §5.4 mpsc backpressure).
35    pub audit_drops_total: AtomicU64,
36}
37
38impl MetricsCounters {
39    /// Bump a dispatch-error code counter. Called from dispatch error
40    /// branches. Lock contention is acceptable here: errors are by
41    /// definition tail events; the success hot path doesn't touch this map.
42    pub fn record_error(&self, code: u16) {
43        if let Ok(mut map) = self.dispatch_errors_by_code.lock() {
44            *map.entry(code).or_insert(0) += 1;
45        }
46    }
47
48    /// Capture a snapshot suitable for serializing into a `/metrics` body
49    /// or a health-check endpoint.
50    pub fn snapshot(&self) -> MetricsSnapshot {
51        let errors_by_code = self
52            .dispatch_errors_by_code
53            .lock()
54            .map(|m| m.clone())
55            .unwrap_or_default();
56        MetricsSnapshot {
57            accepted_connections: self.accepted_connections.load(Ordering::Relaxed),
58            dispatched_requests: self.dispatched_requests.load(Ordering::Relaxed),
59            dispatch_errors_by_code: errors_by_code,
60            audit_events_total: self.audit_events_total.load(Ordering::Relaxed),
61            audit_drops_total: self.audit_drops_total.load(Ordering::Relaxed),
62        }
63    }
64}
65
66/// JSON-serialisable snapshot of [`MetricsCounters`]. Returned by
67/// `Server::metrics_snapshot()` and `atd-conformance` scenarios that need
68/// to assert post-storm invariants (e.g. `audit_drops_total == 0`).
69#[derive(Debug, Clone, Serialize)]
70pub struct MetricsSnapshot {
71    pub accepted_connections: u64,
72    pub dispatched_requests: u64,
73    pub dispatch_errors_by_code: BTreeMap<u16, u64>,
74    pub audit_events_total: u64,
75    pub audit_drops_total: u64,
76}
77
78#[cfg(test)]
79mod tests {
80    use super::*;
81
82    #[test]
83    fn defaults_zero() {
84        let m = MetricsCounters::default();
85        let s = m.snapshot();
86        assert_eq!(s.accepted_connections, 0);
87        assert_eq!(s.dispatched_requests, 0);
88        assert_eq!(s.audit_events_total, 0);
89        assert_eq!(s.audit_drops_total, 0);
90        assert!(s.dispatch_errors_by_code.is_empty());
91    }
92
93    #[test]
94    fn atomic_increments_visible_in_snapshot() {
95        let m = MetricsCounters::default();
96        m.accepted_connections.fetch_add(3, Ordering::Relaxed);
97        m.dispatched_requests.fetch_add(7, Ordering::Relaxed);
98        m.audit_events_total.fetch_add(11, Ordering::Relaxed);
99        m.audit_drops_total.fetch_add(1, Ordering::Relaxed);
100        let s = m.snapshot();
101        assert_eq!(s.accepted_connections, 3);
102        assert_eq!(s.dispatched_requests, 7);
103        assert_eq!(s.audit_events_total, 11);
104        assert_eq!(s.audit_drops_total, 1);
105    }
106
107    #[test]
108    fn record_error_accumulates_per_code() {
109        let m = MetricsCounters::default();
110        m.record_error(1001); // ERR_CAPABILITY_DENIED
111        m.record_error(1001);
112        m.record_error(1002); // ERR_RATE_LIMITED
113        let s = m.snapshot();
114        assert_eq!(s.dispatch_errors_by_code.get(&1001), Some(&2));
115        assert_eq!(s.dispatch_errors_by_code.get(&1002), Some(&1));
116        assert_eq!(s.dispatch_errors_by_code.get(&9999), None);
117    }
118
119    #[test]
120    fn snapshot_is_serialisable_to_json() {
121        let m = MetricsCounters::default();
122        m.accepted_connections.fetch_add(5, Ordering::Relaxed);
123        m.record_error(1010);
124        let s = m.snapshot();
125        let j = serde_json::to_value(&s).expect("serialize snapshot");
126        assert_eq!(j["accepted_connections"], 5);
127        assert_eq!(j["dispatch_errors_by_code"]["1010"], 1);
128    }
129}