Skip to main content

mati_core/mcp/
metrics.rs

1//! Live daemon metrics — per-command call counters and latency percentiles.
2//!
3//! Wired in once at the `dispatch_v2` boundary (the single entry point for
4//! every wire command). Each command records: incremented count, incremented
5//! error count if the response is `Response::Err`, and a latency sample in
6//! microseconds. Latency samples are kept in a per-command ring of the last
7//! `LATENCY_RING_SIZE` observations, from which p50/p95/p99 are computed on
8//! read.
9//!
10//! # Why no new deps
11//!
12//! `hdrhistogram` would be more accurate at higher cardinality, but the
13//! daemon's sustained QPS is low (single-digit req/s under normal hook
14//! load), so a 512-entry ring is plenty for stable percentile estimates
15//! and keeps the dependency surface unchanged. Total memory budget:
16//! ~28 commands × 512 × 4 B ≈ 57 KiB. Negligible.
17//!
18//! # Why `std::sync::Mutex`
19//!
20//! The critical section is a single HashMap lookup + VecDeque push +
21//! u64 increment. No await points are crossed under lock, so the standard
22//! sync `Mutex` is appropriate and avoids pulling in a `tokio::sync::Mutex`
23//! that would force `.await` on the recording path.
24//!
25//! # Global access
26//!
27//! `OnceLock<Arc<Mutex<Metrics>>>` so callers don't need to plumb a new
28//! `Arc` through every dispatch function in `server.rs` / `dispatch_v2.rs`.
29//! Initialized at daemon startup via [`init`]; if `record` is called before
30//! `init` (e.g. in tests that exercise dispatch without booting the daemon),
31//! the recording is silently dropped.
32
33use std::collections::{HashMap, VecDeque};
34use std::sync::{Arc, Mutex, OnceLock};
35use std::time::{Instant, SystemTime, UNIX_EPOCH};
36
37use serde::Serialize;
38
39/// Number of recent latency samples kept per command. Sized so a daemon
40/// running ~512 req/s for one second has full-window coverage; anything
41/// older is replaced.
42pub const LATENCY_RING_SIZE: usize = 512;
43
44/// Process-global metrics handle. Some only after [`init`] has been called.
45static METRICS: OnceLock<Arc<Mutex<Metrics>>> = OnceLock::new();
46
47/// Initialize the global metrics handle. Called once at daemon startup.
48///
49/// Idempotent: a second call is a no-op and returns `false`. Returns `true`
50/// the first time it succeeds. Safe to call from a single-threaded context
51/// during daemon boot before any dispatch happens.
52pub fn init() -> bool {
53    METRICS.set(Arc::new(Mutex::new(Metrics::new()))).is_ok()
54}
55
56/// Record a single dispatch_v2 invocation.
57///
58/// `command_kind` should be `Command::kind()` (a static string). `elapsed_us`
59/// is the wall-clock duration of the dispatch call. `is_error` is `true`
60/// iff the response was [`crate::mcp::protocol::Response::Err`].
61///
62/// No-op if metrics have not been initialized (e.g. in test code that uses
63/// dispatch_v2 directly without starting the daemon).
64pub fn record(command_kind: &'static str, elapsed_us: u32, is_error: bool) {
65    let Some(handle) = METRICS.get() else {
66        return;
67    };
68    // Lock contention is minimal: only the dispatch_v2 entry point writes.
69    // If poisoned, we drop the sample rather than panic — observability
70    // should never crash the daemon.
71    let Ok(mut m) = handle.lock() else {
72        return;
73    };
74    m.record(command_kind, elapsed_us, is_error);
75}
76
77/// Return a serializable snapshot of current metrics. Cheap — produces
78/// a sorted copy of each latency ring to compute percentiles.
79pub fn snapshot() -> Option<MetricsSnapshot> {
80    let handle = METRICS.get()?;
81    let m = handle.lock().ok()?;
82    Some(m.snapshot())
83}
84
85/// Live per-command stats. Reset on daemon restart.
86struct CommandStats {
87    /// Total successful + failed invocations.
88    count: u64,
89    /// Subset of `count` that returned `Response::Err`.
90    error_count: u64,
91    /// Sum of all observed latencies (microseconds). Saturating add.
92    latency_sum_us: u64,
93    /// Maximum observed latency (microseconds).
94    latency_max_us: u32,
95    /// Bounded ring of recent latency samples for percentile calculation.
96    latencies_us: VecDeque<u32>,
97}
98
99impl CommandStats {
100    fn new() -> Self {
101        Self {
102            count: 0,
103            error_count: 0,
104            latency_sum_us: 0,
105            latency_max_us: 0,
106            latencies_us: VecDeque::with_capacity(LATENCY_RING_SIZE),
107        }
108    }
109
110    fn record(&mut self, elapsed_us: u32, is_error: bool) {
111        self.count = self.count.saturating_add(1);
112        if is_error {
113            self.error_count = self.error_count.saturating_add(1);
114        }
115        self.latency_sum_us = self.latency_sum_us.saturating_add(u64::from(elapsed_us));
116        if elapsed_us > self.latency_max_us {
117            self.latency_max_us = elapsed_us;
118        }
119        if self.latencies_us.len() == LATENCY_RING_SIZE {
120            self.latencies_us.pop_front();
121        }
122        self.latencies_us.push_back(elapsed_us);
123    }
124
125    /// Compute p50, p95, p99 from the current ring (sorted on demand).
126    fn percentiles(&self) -> (u32, u32, u32) {
127        if self.latencies_us.is_empty() {
128            return (0, 0, 0);
129        }
130        let mut sorted: Vec<u32> = self.latencies_us.iter().copied().collect();
131        sorted.sort_unstable();
132        let n = sorted.len();
133        // Nearest-rank percentile: ceil(p/100 * n) - 1, bounded.
134        let pick = |p: u32| -> u32 {
135            let idx = ((u64::from(p) * n as u64).div_ceil(100) as usize).saturating_sub(1);
136            sorted[idx.min(n - 1)]
137        };
138        (pick(50), pick(95), pick(99))
139    }
140}
141
142/// Live metrics state. Owned by the global `OnceLock` handle.
143struct Metrics {
144    started_at_secs: u64,
145    started_instant: Instant,
146    per_command: HashMap<&'static str, CommandStats>,
147}
148
149impl Metrics {
150    fn new() -> Self {
151        Self {
152            started_at_secs: SystemTime::now()
153                .duration_since(UNIX_EPOCH)
154                .map(|d| d.as_secs())
155                .unwrap_or(0),
156            started_instant: Instant::now(),
157            per_command: HashMap::new(),
158        }
159    }
160
161    fn record(&mut self, command_kind: &'static str, elapsed_us: u32, is_error: bool) {
162        self.per_command
163            .entry(command_kind)
164            .or_insert_with(CommandStats::new)
165            .record(elapsed_us, is_error);
166    }
167
168    fn snapshot(&self) -> MetricsSnapshot {
169        let mut commands: Vec<CommandSnapshot> = self
170            .per_command
171            .iter()
172            .map(|(name, stats)| {
173                let (p50, p95, p99) = stats.percentiles();
174                let mean_us = stats.latency_sum_us.checked_div(stats.count).unwrap_or(0) as u32;
175                CommandSnapshot {
176                    name,
177                    count: stats.count,
178                    error_count: stats.error_count,
179                    mean_us,
180                    p50_us: p50,
181                    p95_us: p95,
182                    p99_us: p99,
183                    max_us: stats.latency_max_us,
184                }
185            })
186            .collect();
187        // Stable order: descending by count, then name.
188        commands.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.name.cmp(b.name)));
189
190        let total_calls: u64 = self.per_command.values().map(|s| s.count).sum();
191        let total_errors: u64 = self.per_command.values().map(|s| s.error_count).sum();
192
193        MetricsSnapshot {
194            version: SNAPSHOT_VERSION,
195            uptime_secs: self.started_instant.elapsed().as_secs(),
196            started_at_secs: self.started_at_secs,
197            total_calls,
198            total_errors,
199            commands,
200        }
201    }
202}
203
204/// Schema version for the metrics snapshot. Bump when fields change shape so
205/// the doctor renderer can pin behavior.
206pub const SNAPSHOT_VERSION: u32 = 1;
207
208/// Serializable snapshot returned by [`snapshot`] and over the `metrics`
209/// socket command.
210#[derive(Debug, Serialize)]
211pub struct MetricsSnapshot {
212    pub version: u32,
213    pub uptime_secs: u64,
214    pub started_at_secs: u64,
215    pub total_calls: u64,
216    pub total_errors: u64,
217    pub commands: Vec<CommandSnapshot>,
218}
219
220/// Per-command row in a metrics snapshot.
221#[derive(Debug, Serialize)]
222pub struct CommandSnapshot {
223    pub name: &'static str,
224    pub count: u64,
225    pub error_count: u64,
226    pub mean_us: u32,
227    pub p50_us: u32,
228    pub p95_us: u32,
229    pub p99_us: u32,
230    pub max_us: u32,
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236
237    fn fresh_metrics() -> Metrics {
238        Metrics::new()
239    }
240
241    #[test]
242    fn record_increments_count_and_tracks_latency() {
243        let mut m = fresh_metrics();
244        m.record("ping", 100, false);
245        m.record("ping", 200, false);
246        m.record("ping", 300, true);
247
248        let snap = m.snapshot();
249        let ping = snap
250            .commands
251            .iter()
252            .find(|c| c.name == "ping")
253            .expect("ping row present");
254        assert_eq!(ping.count, 3);
255        assert_eq!(ping.error_count, 1);
256        assert_eq!(ping.max_us, 300);
257        assert_eq!(ping.mean_us, 200);
258    }
259
260    #[test]
261    fn percentiles_with_uniform_distribution() {
262        let mut m = fresh_metrics();
263        for i in 1..=100u32 {
264            m.record("mem_get", i * 10, false);
265        }
266        let snap = m.snapshot();
267        let mem_get = snap
268            .commands
269            .iter()
270            .find(|c| c.name == "mem_get")
271            .expect("mem_get row present");
272        assert_eq!(mem_get.count, 100);
273        // 50th percentile of 1..=100 (×10) is the 50th smallest = 500.
274        assert_eq!(mem_get.p50_us, 500);
275        // 95th percentile is the 95th smallest = 950.
276        assert_eq!(mem_get.p95_us, 950);
277        // 99th percentile is the 99th smallest = 990.
278        assert_eq!(mem_get.p99_us, 990);
279        assert_eq!(mem_get.max_us, 1000);
280    }
281
282    #[test]
283    fn ring_evicts_oldest_above_capacity() {
284        let mut m = fresh_metrics();
285        // Push 2× capacity. Older half should be evicted.
286        for i in 0..(LATENCY_RING_SIZE * 2) as u32 {
287            m.record("mem_query", i + 1, false);
288        }
289        let snap = m.snapshot();
290        let mq = snap
291            .commands
292            .iter()
293            .find(|c| c.name == "mem_query")
294            .unwrap();
295        assert_eq!(mq.count, (LATENCY_RING_SIZE * 2) as u64);
296        // p50 of the retained second half (LATENCY_RING_SIZE+1 .. 2*LATENCY_RING_SIZE).
297        let expected_p50 = (LATENCY_RING_SIZE + LATENCY_RING_SIZE / 2) as u32;
298        assert_eq!(mq.p50_us, expected_p50);
299    }
300
301    #[test]
302    fn snapshot_is_ordered_by_count_then_name() {
303        let mut m = fresh_metrics();
304        for _ in 0..5 {
305            m.record("ping", 10, false);
306        }
307        for _ in 0..10 {
308            m.record("mem_get", 20, false);
309        }
310        for _ in 0..10 {
311            m.record("get", 15, false);
312        }
313        let snap = m.snapshot();
314        assert_eq!(snap.commands[0].name, "get"); // tied with mem_get, sorts first by name
315        assert_eq!(snap.commands[1].name, "mem_get");
316        assert_eq!(snap.commands[2].name, "ping");
317    }
318
319    #[test]
320    fn empty_metrics_yields_empty_snapshot() {
321        let m = fresh_metrics();
322        let snap = m.snapshot();
323        assert_eq!(snap.total_calls, 0);
324        assert_eq!(snap.total_errors, 0);
325        assert!(snap.commands.is_empty());
326    }
327}