zeph_llm/classifier/
metrics.rs1use std::collections::VecDeque;
10use std::sync::Mutex;
11use std::time::Duration;
12
13use super::ClassifierTask;
14
15pub const DEFAULT_RING_BUFFER_SIZE: usize = 100;
17
18struct TaskBuffer {
19 latencies: VecDeque<Duration>,
20 capacity: usize,
21 call_count: u64,
22}
23
24impl TaskBuffer {
25 fn new(capacity: usize) -> Self {
26 Self {
27 latencies: VecDeque::with_capacity(capacity),
28 capacity,
29 call_count: 0,
30 }
31 }
32
33 fn record(&mut self, latency: Duration) {
34 if self.latencies.len() == self.capacity {
35 self.latencies.pop_front();
36 }
37 self.latencies.push_back(latency);
38 self.call_count += 1;
39 }
40
41 fn percentile(&self, p: f64) -> Option<Duration> {
45 if self.latencies.is_empty() {
46 return None;
47 }
48 let mut sorted: Vec<Duration> = self.latencies.iter().copied().collect();
49 sorted.sort_unstable();
50 #[allow(
51 clippy::cast_precision_loss,
52 clippy::cast_sign_loss,
53 clippy::cast_possible_truncation
54 )]
55 let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize;
56 let idx = idx.min(sorted.len() - 1);
57 Some(sorted[idx])
58 }
59
60 fn snapshot(&self) -> TaskMetricsSnapshot {
61 TaskMetricsSnapshot {
62 call_count: self.call_count,
63 #[allow(clippy::cast_possible_truncation)]
64 p50_ms: self.percentile(0.50).map(|d| d.as_millis() as u64),
65 #[allow(clippy::cast_possible_truncation)]
66 p95_ms: self.percentile(0.95).map(|d| d.as_millis() as u64),
67 }
68 }
69}
70
71#[derive(Debug, Clone, Default)]
73pub struct TaskMetricsSnapshot {
74 pub call_count: u64,
75 pub p50_ms: Option<u64>,
76 pub p95_ms: Option<u64>,
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct ClassifierMetricsSnapshot {
82 pub injection: TaskMetricsSnapshot,
83 pub pii: TaskMetricsSnapshot,
84 pub feedback: TaskMetricsSnapshot,
85}
86
87struct ClassifierMetricsInner {
88 injection: TaskBuffer,
89 pii: TaskBuffer,
90 feedback: TaskBuffer,
91}
92
93pub struct ClassifierMetrics {
98 inner: Mutex<ClassifierMetricsInner>,
99}
100
101impl ClassifierMetrics {
102 #[must_use]
104 pub fn new(ring_buffer_size: usize) -> Self {
105 Self {
106 inner: Mutex::new(ClassifierMetricsInner {
107 injection: TaskBuffer::new(ring_buffer_size),
108 pii: TaskBuffer::new(ring_buffer_size),
109 feedback: TaskBuffer::new(ring_buffer_size),
110 }),
111 }
112 }
113
114 pub fn record(&self, task: ClassifierTask, latency: Duration) {
120 let snapshot = {
121 let mut inner = self.inner.lock().expect("classifier metrics lock poisoned");
122 let buf = match task {
123 ClassifierTask::Injection => &mut inner.injection,
124 ClassifierTask::Pii => &mut inner.pii,
125 ClassifierTask::Feedback => &mut inner.feedback,
126 };
127 buf.record(latency);
128 buf.snapshot()
129 };
130
131 let task_name = match task {
132 ClassifierTask::Injection => "injection",
133 ClassifierTask::Pii => "pii",
134 ClassifierTask::Feedback => "feedback",
135 };
136
137 #[allow(clippy::cast_possible_truncation)]
138 let latency_ms_u64 = latency.as_millis() as u64;
139 tracing::debug!(
140 classifier_task = task_name,
141 latency_ms = latency_ms_u64,
142 p50_ms = snapshot.p50_ms.unwrap_or(0),
143 p95_ms = snapshot.p95_ms.unwrap_or(0),
144 call_count = snapshot.call_count,
145 "classifier_metrics"
146 );
147 }
148
149 #[must_use]
155 pub fn snapshot(&self) -> ClassifierMetricsSnapshot {
156 let inner = self.inner.lock().expect("classifier metrics lock poisoned");
157 ClassifierMetricsSnapshot {
158 injection: inner.injection.snapshot(),
159 pii: inner.pii.snapshot(),
160 feedback: inner.feedback.snapshot(),
161 }
162 }
163}
164
165impl Default for ClassifierMetrics {
166 fn default() -> Self {
167 Self::new(DEFAULT_RING_BUFFER_SIZE)
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use super::*;
174
175 #[test]
176 fn record_single_sample_gives_same_p50_p95() {
177 let m = ClassifierMetrics::default();
178 m.record(ClassifierTask::Injection, Duration::from_millis(42));
179 let s = m.snapshot();
180 assert_eq!(s.injection.call_count, 1);
181 assert_eq!(s.injection.p50_ms, Some(42));
182 assert_eq!(s.injection.p95_ms, Some(42));
183 assert_eq!(s.pii.call_count, 0);
184 assert_eq!(s.pii.p50_ms, None);
185 assert_eq!(s.feedback.call_count, 0);
186 }
187
188 #[test]
189 fn p50_p95_correct_for_ten_samples() {
190 let m = ClassifierMetrics::default();
191 for i in 1u64..=10 {
192 m.record(ClassifierTask::Pii, Duration::from_millis(i * 10));
193 }
194 let s = m.snapshot();
195 assert_eq!(s.pii.call_count, 10);
196 assert_eq!(s.pii.p50_ms, Some(60));
199 assert_eq!(s.pii.p95_ms, Some(100));
201 }
202
203 #[test]
204 fn ring_buffer_evicts_oldest_when_full() {
205 let m = ClassifierMetrics::new(3);
206 m.record(ClassifierTask::Feedback, Duration::from_millis(10));
207 m.record(ClassifierTask::Feedback, Duration::from_millis(20));
208 m.record(ClassifierTask::Feedback, Duration::from_millis(30));
209 m.record(ClassifierTask::Feedback, Duration::from_millis(40));
211 let s = m.snapshot();
212 assert_eq!(s.feedback.call_count, 4);
213 assert_eq!(s.feedback.p50_ms, Some(30));
216 }
217
218 #[test]
219 fn empty_snapshot_has_none_percentiles() {
220 let m = ClassifierMetrics::default();
221 let s = m.snapshot();
222 assert_eq!(s.injection.p50_ms, None);
223 assert_eq!(s.injection.p95_ms, None);
224 assert_eq!(s.pii.p50_ms, None);
225 assert_eq!(s.feedback.p50_ms, None);
226 }
227
228 #[test]
229 fn two_samples_p50_returns_higher_with_round() {
230 let m = ClassifierMetrics::default();
231 m.record(ClassifierTask::Injection, Duration::from_millis(10));
232 m.record(ClassifierTask::Injection, Duration::from_millis(20));
233 let s = m.snapshot();
234 assert_eq!(s.injection.p50_ms, Some(20));
239 }
240
241 #[test]
242 fn p50_p95_correct_for_one_to_ten_ms() {
243 let m = ClassifierMetrics::default();
244 for i in 1u64..=10 {
245 m.record(ClassifierTask::Injection, Duration::from_millis(i));
246 }
247 let s = m.snapshot();
248 assert_eq!(s.injection.call_count, 10);
249 assert_eq!(s.injection.p50_ms, Some(6));
252 assert_eq!(s.injection.p95_ms, Some(10));
254 }
255
256 #[test]
257 fn identical_values_give_same_p50_p95() {
258 let m = ClassifierMetrics::new(DEFAULT_RING_BUFFER_SIZE);
259 for _ in 0..DEFAULT_RING_BUFFER_SIZE {
260 m.record(ClassifierTask::Pii, Duration::from_millis(77));
261 }
262 let s = m.snapshot();
263 assert_eq!(s.pii.call_count, DEFAULT_RING_BUFFER_SIZE as u64);
264 assert_eq!(s.pii.p50_ms, Some(77));
265 assert_eq!(s.pii.p95_ms, Some(77));
266 }
267
268 #[test]
269 fn ring_buffer_evicts_oldest_at_default_capacity() {
270 let m = ClassifierMetrics::new(DEFAULT_RING_BUFFER_SIZE);
271 for i in 1u64..=DEFAULT_RING_BUFFER_SIZE as u64 {
273 m.record(ClassifierTask::Injection, Duration::from_millis(i));
274 }
275 m.record(ClassifierTask::Injection, Duration::from_millis(200));
277 let s = m.snapshot();
278 assert_eq!(s.injection.call_count, DEFAULT_RING_BUFFER_SIZE as u64 + 1);
279 assert_eq!(s.injection.p50_ms, Some(52));
282 assert_eq!(s.injection.p95_ms, Some(96));
284 }
285}