use std::collections::VecDeque;
use std::sync::Mutex;
use std::time::Duration;
use super::ClassifierTask;
pub const DEFAULT_RING_BUFFER_SIZE: usize = 100;
struct TaskBuffer {
latencies: VecDeque<Duration>,
capacity: usize,
call_count: u64,
}
impl TaskBuffer {
fn new(capacity: usize) -> Self {
Self {
latencies: VecDeque::with_capacity(capacity),
capacity,
call_count: 0,
}
}
fn record(&mut self, latency: Duration) {
if self.latencies.len() == self.capacity {
self.latencies.pop_front();
}
self.latencies.push_back(latency);
self.call_count += 1;
}
fn percentile(&self, p: f64) -> Option<Duration> {
if self.latencies.is_empty() {
return None;
}
let mut sorted: Vec<Duration> = self.latencies.iter().copied().collect();
sorted.sort_unstable();
#[allow(
clippy::cast_precision_loss,
clippy::cast_sign_loss,
clippy::cast_possible_truncation
)]
let idx = ((sorted.len() as f64 - 1.0) * p).round() as usize;
let idx = idx.min(sorted.len() - 1);
Some(sorted[idx])
}
fn snapshot(&self) -> TaskMetricsSnapshot {
TaskMetricsSnapshot {
call_count: self.call_count,
#[allow(clippy::cast_possible_truncation)]
p50_ms: self.percentile(0.50).map(|d| d.as_millis() as u64),
#[allow(clippy::cast_possible_truncation)]
p95_ms: self.percentile(0.95).map(|d| d.as_millis() as u64),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct TaskMetricsSnapshot {
pub call_count: u64,
pub p50_ms: Option<u64>,
pub p95_ms: Option<u64>,
}
#[derive(Debug, Clone, Default)]
pub struct ClassifierMetricsSnapshot {
pub injection: TaskMetricsSnapshot,
pub pii: TaskMetricsSnapshot,
pub feedback: TaskMetricsSnapshot,
}
struct ClassifierMetricsInner {
injection: TaskBuffer,
pii: TaskBuffer,
feedback: TaskBuffer,
}
pub struct ClassifierMetrics {
inner: Mutex<ClassifierMetricsInner>,
}
impl ClassifierMetrics {
#[must_use]
pub fn new(ring_buffer_size: usize) -> Self {
Self {
inner: Mutex::new(ClassifierMetricsInner {
injection: TaskBuffer::new(ring_buffer_size),
pii: TaskBuffer::new(ring_buffer_size),
feedback: TaskBuffer::new(ring_buffer_size),
}),
}
}
pub fn record(&self, task: ClassifierTask, latency: Duration) {
let snapshot = {
let mut inner = self.inner.lock().expect("classifier metrics lock poisoned");
let buf = match task {
ClassifierTask::Injection => &mut inner.injection,
ClassifierTask::Pii => &mut inner.pii,
ClassifierTask::Feedback => &mut inner.feedback,
};
buf.record(latency);
buf.snapshot()
};
let task_name = match task {
ClassifierTask::Injection => "injection",
ClassifierTask::Pii => "pii",
ClassifierTask::Feedback => "feedback",
};
#[allow(clippy::cast_possible_truncation)]
let latency_ms_u64 = latency.as_millis() as u64;
tracing::debug!(
classifier_task = task_name,
latency_ms = latency_ms_u64,
p50_ms = snapshot.p50_ms.unwrap_or(0),
p95_ms = snapshot.p95_ms.unwrap_or(0),
call_count = snapshot.call_count,
"classifier_metrics"
);
}
#[must_use]
pub fn snapshot(&self) -> ClassifierMetricsSnapshot {
let inner = self.inner.lock().expect("classifier metrics lock poisoned");
ClassifierMetricsSnapshot {
injection: inner.injection.snapshot(),
pii: inner.pii.snapshot(),
feedback: inner.feedback.snapshot(),
}
}
}
impl Default for ClassifierMetrics {
fn default() -> Self {
Self::new(DEFAULT_RING_BUFFER_SIZE)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn record_single_sample_gives_same_p50_p95() {
let m = ClassifierMetrics::default();
m.record(ClassifierTask::Injection, Duration::from_millis(42));
let s = m.snapshot();
assert_eq!(s.injection.call_count, 1);
assert_eq!(s.injection.p50_ms, Some(42));
assert_eq!(s.injection.p95_ms, Some(42));
assert_eq!(s.pii.call_count, 0);
assert_eq!(s.pii.p50_ms, None);
assert_eq!(s.feedback.call_count, 0);
}
#[test]
fn p50_p95_correct_for_ten_samples() {
let m = ClassifierMetrics::default();
for i in 1u64..=10 {
m.record(ClassifierTask::Pii, Duration::from_millis(i * 10));
}
let s = m.snapshot();
assert_eq!(s.pii.call_count, 10);
assert_eq!(s.pii.p50_ms, Some(60));
assert_eq!(s.pii.p95_ms, Some(100));
}
#[test]
fn ring_buffer_evicts_oldest_when_full() {
let m = ClassifierMetrics::new(3);
m.record(ClassifierTask::Feedback, Duration::from_millis(10));
m.record(ClassifierTask::Feedback, Duration::from_millis(20));
m.record(ClassifierTask::Feedback, Duration::from_millis(30));
m.record(ClassifierTask::Feedback, Duration::from_millis(40));
let s = m.snapshot();
assert_eq!(s.feedback.call_count, 4);
assert_eq!(s.feedback.p50_ms, Some(30));
}
#[test]
fn empty_snapshot_has_none_percentiles() {
let m = ClassifierMetrics::default();
let s = m.snapshot();
assert_eq!(s.injection.p50_ms, None);
assert_eq!(s.injection.p95_ms, None);
assert_eq!(s.pii.p50_ms, None);
assert_eq!(s.feedback.p50_ms, None);
}
#[test]
fn two_samples_p50_returns_higher_with_round() {
let m = ClassifierMetrics::default();
m.record(ClassifierTask::Injection, Duration::from_millis(10));
m.record(ClassifierTask::Injection, Duration::from_millis(20));
let s = m.snapshot();
assert_eq!(s.injection.p50_ms, Some(20));
}
#[test]
fn p50_p95_correct_for_one_to_ten_ms() {
let m = ClassifierMetrics::default();
for i in 1u64..=10 {
m.record(ClassifierTask::Injection, Duration::from_millis(i));
}
let s = m.snapshot();
assert_eq!(s.injection.call_count, 10);
assert_eq!(s.injection.p50_ms, Some(6));
assert_eq!(s.injection.p95_ms, Some(10));
}
#[test]
fn identical_values_give_same_p50_p95() {
let m = ClassifierMetrics::new(DEFAULT_RING_BUFFER_SIZE);
for _ in 0..DEFAULT_RING_BUFFER_SIZE {
m.record(ClassifierTask::Pii, Duration::from_millis(77));
}
let s = m.snapshot();
assert_eq!(s.pii.call_count, DEFAULT_RING_BUFFER_SIZE as u64);
assert_eq!(s.pii.p50_ms, Some(77));
assert_eq!(s.pii.p95_ms, Some(77));
}
#[test]
fn ring_buffer_evicts_oldest_at_default_capacity() {
let m = ClassifierMetrics::new(DEFAULT_RING_BUFFER_SIZE);
for i in 1u64..=DEFAULT_RING_BUFFER_SIZE as u64 {
m.record(ClassifierTask::Injection, Duration::from_millis(i));
}
m.record(ClassifierTask::Injection, Duration::from_millis(200));
let s = m.snapshot();
assert_eq!(s.injection.call_count, DEFAULT_RING_BUFFER_SIZE as u64 + 1);
assert_eq!(s.injection.p50_ms, Some(52));
assert_eq!(s.injection.p95_ms, Some(96));
}
}