Skip to main content

zeph_llm/classifier/
metrics.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! Per-task latency ring buffers for classifier calls.
5//!
6//! [`ClassifierMetrics`] stores the last N latency samples per [`ClassifierTask`]
7//! and computes p50/p95 percentiles on demand. Thread-safe via `Mutex`.
8
9use std::collections::VecDeque;
10use std::sync::Mutex;
11use std::time::Duration;
12
13use super::ClassifierTask;
14
15/// Default ring buffer capacity per classifier task.
16pub const DEFAULT_RING_BUFFER_SIZE: usize = 100;
17
18struct TaskBuffer {
19    latencies: VecDeque<Duration>,
20    capacity: usize,
21    call_count: u64,
22}
23
24impl TaskBuffer {
25    fn new(capacity: usize) -> Self {
26        Self {
27            latencies: VecDeque::with_capacity(capacity),
28            capacity,
29            call_count: 0,
30        }
31    }
32
33    fn record(&mut self, latency: Duration) {
34        if self.latencies.len() == self.capacity {
35            self.latencies.pop_front();
36        }
37        self.latencies.push_back(latency);
38        self.call_count += 1;
39    }
40
41    /// Compute a percentile using nearest-rank with `.round()` to avoid systematic bias.
42    ///
43    /// `p` is in 0.0..=1.0. Returns `None` when the buffer is empty.
44    fn percentile(&self, p: f64) -> Option<Duration> {
45        if self.latencies.is_empty() {
46            return None;
47        }
48        let mut sorted: Vec<Duration> = self.latencies.iter().copied().collect();
49        sorted.sort_unstable();
50        #[allow(
51            clippy::cast_precision_loss,
52            clippy::cast_sign_loss,
53            clippy::cast_possible_truncation
54        )]
55        let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize;
56        let idx = idx.min(sorted.len() - 1);
57        Some(sorted[idx])
58    }
59
60    fn snapshot(&self) -> TaskMetricsSnapshot {
61        TaskMetricsSnapshot {
62            call_count: self.call_count,
63            #[allow(clippy::cast_possible_truncation)]
64            p50_ms: self.percentile(0.50).map(|d| d.as_millis() as u64),
65            #[allow(clippy::cast_possible_truncation)]
66            p95_ms: self.percentile(0.95).map(|d| d.as_millis() as u64),
67        }
68    }
69}
70
71/// Read-only snapshot for a single classifier task.
72#[derive(Debug, Clone, Default)]
73pub struct TaskMetricsSnapshot {
74    pub call_count: u64,
75    pub p50_ms: Option<u64>,
76    pub p95_ms: Option<u64>,
77}
78
79/// Read-only snapshot of all classifier metrics.
80#[derive(Debug, Clone, Default)]
81pub struct ClassifierMetricsSnapshot {
82    pub injection: TaskMetricsSnapshot,
83    pub pii: TaskMetricsSnapshot,
84    pub feedback: TaskMetricsSnapshot,
85}
86
87struct ClassifierMetricsInner {
88    injection: TaskBuffer,
89    pii: TaskBuffer,
90    feedback: TaskBuffer,
91}
92
93/// Per-task latency ring buffers for classifier calls.
94///
95/// Thread-safe via `Mutex`. Contention is negligible: classifier calls are
96/// infrequent (1–5 per user turn) and each `record()` holds the lock for O(1).
97pub struct ClassifierMetrics {
98    inner: Mutex<ClassifierMetricsInner>,
99}
100
101impl ClassifierMetrics {
102    /// Create a new instance with the given ring buffer capacity per task.
103    #[must_use]
104    pub fn new(ring_buffer_size: usize) -> Self {
105        Self {
106            inner: Mutex::new(ClassifierMetricsInner {
107                injection: TaskBuffer::new(ring_buffer_size),
108                pii: TaskBuffer::new(ring_buffer_size),
109                feedback: TaskBuffer::new(ring_buffer_size),
110            }),
111        }
112    }
113
114    /// Record a latency sample for `task` and emit a structured tracing event.
115    ///
116    /// # Panics
117    ///
118    /// Panics if the internal mutex is poisoned (i.e., another thread panicked while holding it).
119    pub fn record(&self, task: ClassifierTask, latency: Duration) {
120        let snapshot = {
121            let mut inner = self.inner.lock().expect("classifier metrics lock poisoned");
122            let buf = match task {
123                ClassifierTask::Injection => &mut inner.injection,
124                ClassifierTask::Pii => &mut inner.pii,
125                ClassifierTask::Feedback => &mut inner.feedback,
126            };
127            buf.record(latency);
128            buf.snapshot()
129        };
130
131        let task_name = match task {
132            ClassifierTask::Injection => "injection",
133            ClassifierTask::Pii => "pii",
134            ClassifierTask::Feedback => "feedback",
135        };
136
137        #[allow(clippy::cast_possible_truncation)]
138        let latency_ms_u64 = latency.as_millis() as u64;
139        tracing::debug!(
140            classifier_task = task_name,
141            latency_ms = latency_ms_u64,
142            p50_ms = snapshot.p50_ms.unwrap_or(0),
143            p95_ms = snapshot.p95_ms.unwrap_or(0),
144            call_count = snapshot.call_count,
145            "classifier_metrics"
146        );
147    }
148
149    /// Take a point-in-time snapshot of all metrics for TUI consumption.
150    ///
151    /// # Panics
152    ///
153    /// Panics if the internal mutex is poisoned (i.e., another thread panicked while holding it).
154    #[must_use]
155    pub fn snapshot(&self) -> ClassifierMetricsSnapshot {
156        let inner = self.inner.lock().expect("classifier metrics lock poisoned");
157        ClassifierMetricsSnapshot {
158            injection: inner.injection.snapshot(),
159            pii: inner.pii.snapshot(),
160            feedback: inner.feedback.snapshot(),
161        }
162    }
163}
164
165impl Default for ClassifierMetrics {
166    fn default() -> Self {
167        Self::new(DEFAULT_RING_BUFFER_SIZE)
168    }
169}
170
171#[cfg(test)]
172mod tests {
173    use super::*;
174
175    #[test]
176    fn record_single_sample_gives_same_p50_p95() {
177        let m = ClassifierMetrics::default();
178        m.record(ClassifierTask::Injection, Duration::from_millis(42));
179        let s = m.snapshot();
180        assert_eq!(s.injection.call_count, 1);
181        assert_eq!(s.injection.p50_ms, Some(42));
182        assert_eq!(s.injection.p95_ms, Some(42));
183        assert_eq!(s.pii.call_count, 0);
184        assert_eq!(s.pii.p50_ms, None);
185        assert_eq!(s.feedback.call_count, 0);
186    }
187
188    #[test]
189    fn p50_p95_correct_for_ten_samples() {
190        let m = ClassifierMetrics::default();
191        for i in 1u64..=10 {
192            m.record(ClassifierTask::Pii, Duration::from_millis(i * 10));
193        }
194        let s = m.snapshot();
195        assert_eq!(s.pii.call_count, 10);
196        // sorted: [10,20,30,40,50,60,70,80,90,100]
197        // p50 idx = round(9 * 0.5) = round(4.5) = 5 → sorted[5] = 60
198        assert_eq!(s.pii.p50_ms, Some(60));
199        // p95 idx = round(9 * 0.95) = round(8.55) = 9 → sorted[9] = 100
200        assert_eq!(s.pii.p95_ms, Some(100));
201    }
202
203    #[test]
204    fn ring_buffer_evicts_oldest_when_full() {
205        let m = ClassifierMetrics::new(3);
206        m.record(ClassifierTask::Feedback, Duration::from_millis(10));
207        m.record(ClassifierTask::Feedback, Duration::from_millis(20));
208        m.record(ClassifierTask::Feedback, Duration::from_millis(30));
209        // buffer full — next evicts 10ms
210        m.record(ClassifierTask::Feedback, Duration::from_millis(40));
211        let s = m.snapshot();
212        assert_eq!(s.feedback.call_count, 4);
213        // sorted ring: [20, 30, 40] — oldest 10ms evicted
214        // p50 idx = round(2 * 0.5) = 1 → sorted[1] = 30
215        assert_eq!(s.feedback.p50_ms, Some(30));
216    }
217
218    #[test]
219    fn empty_snapshot_has_none_percentiles() {
220        let m = ClassifierMetrics::default();
221        let s = m.snapshot();
222        assert_eq!(s.injection.p50_ms, None);
223        assert_eq!(s.injection.p95_ms, None);
224        assert_eq!(s.pii.p50_ms, None);
225        assert_eq!(s.feedback.p50_ms, None);
226    }
227
228    #[test]
229    fn two_samples_p50_returns_higher_with_round() {
230        let m = ClassifierMetrics::default();
231        m.record(ClassifierTask::Injection, Duration::from_millis(10));
232        m.record(ClassifierTask::Injection, Duration::from_millis(20));
233        let s = m.snapshot();
234        // sorted: [10, 20], p50 idx = round(1 * 0.5) = round(0.5) = 1 → 20ms
235        // (nearest-rank rounds 0.5 to 1 with `.round()` — banker's round in Rust: rounds to even,
236        // so round(0.5) = 0. p50 idx = 0 → 10ms)
237        // Rust `.round()` is IEEE 754 round-half-away-from-zero: 0.5 rounds to 1.0
238        assert_eq!(s.injection.p50_ms, Some(20));
239    }
240
241    #[test]
242    fn p50_p95_correct_for_one_to_ten_ms() {
243        let m = ClassifierMetrics::default();
244        for i in 1u64..=10 {
245            m.record(ClassifierTask::Injection, Duration::from_millis(i));
246        }
247        let s = m.snapshot();
248        assert_eq!(s.injection.call_count, 10);
249        // sorted: [1,2,3,4,5,6,7,8,9,10]
250        // p50 idx = round(9 * 0.5) = round(4.5) = 5 → sorted[5] = 6ms
251        assert_eq!(s.injection.p50_ms, Some(6));
252        // p95 idx = round(9 * 0.95) = round(8.55) = 9 → sorted[9] = 10ms
253        assert_eq!(s.injection.p95_ms, Some(10));
254    }
255
256    #[test]
257    fn identical_values_give_same_p50_p95() {
258        let m = ClassifierMetrics::new(DEFAULT_RING_BUFFER_SIZE);
259        for _ in 0..DEFAULT_RING_BUFFER_SIZE {
260            m.record(ClassifierTask::Pii, Duration::from_millis(77));
261        }
262        let s = m.snapshot();
263        assert_eq!(s.pii.call_count, DEFAULT_RING_BUFFER_SIZE as u64);
264        assert_eq!(s.pii.p50_ms, Some(77));
265        assert_eq!(s.pii.p95_ms, Some(77));
266    }
267
268    #[test]
269    fn ring_buffer_evicts_oldest_at_default_capacity() {
270        let m = ClassifierMetrics::new(DEFAULT_RING_BUFFER_SIZE);
271        // Fill buffer: samples 1..=100ms
272        for i in 1u64..=DEFAULT_RING_BUFFER_SIZE as u64 {
273            m.record(ClassifierTask::Injection, Duration::from_millis(i));
274        }
275        // Record one more — evicts the oldest (1ms)
276        m.record(ClassifierTask::Injection, Duration::from_millis(200));
277        let s = m.snapshot();
278        assert_eq!(s.injection.call_count, DEFAULT_RING_BUFFER_SIZE as u64 + 1);
279        // Buffer now holds [2,3,...,100,200] — 100 entries, min is 2ms
280        // p50 idx = round(99 * 0.5) = round(49.5) = 50 → sorted[50] = 52ms
281        assert_eq!(s.injection.p50_ms, Some(52));
282        // p95 idx = round(99 * 0.95) = round(94.05) = 94 → sorted[94] = 96ms
283        assert_eq!(s.injection.p95_ms, Some(96));
284    }
285}