use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex, OnceLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use serde::Serialize;
pub const LATENCY_RING_SIZE: usize = 512;
static METRICS: OnceLock<Arc<Mutex<Metrics>>> = OnceLock::new();
pub fn init() -> bool {
METRICS.set(Arc::new(Mutex::new(Metrics::new()))).is_ok()
}
pub fn record(command_kind: &'static str, elapsed_us: u32, is_error: bool) {
let Some(handle) = METRICS.get() else {
return;
};
let Ok(mut m) = handle.lock() else {
return;
};
m.record(command_kind, elapsed_us, is_error);
}
pub fn snapshot() -> Option<MetricsSnapshot> {
let handle = METRICS.get()?;
let m = handle.lock().ok()?;
Some(m.snapshot())
}
struct CommandStats {
count: u64,
error_count: u64,
latency_sum_us: u64,
latency_max_us: u32,
latencies_us: VecDeque<u32>,
}
impl CommandStats {
fn new() -> Self {
Self {
count: 0,
error_count: 0,
latency_sum_us: 0,
latency_max_us: 0,
latencies_us: VecDeque::with_capacity(LATENCY_RING_SIZE),
}
}
fn record(&mut self, elapsed_us: u32, is_error: bool) {
self.count = self.count.saturating_add(1);
if is_error {
self.error_count = self.error_count.saturating_add(1);
}
self.latency_sum_us = self.latency_sum_us.saturating_add(u64::from(elapsed_us));
if elapsed_us > self.latency_max_us {
self.latency_max_us = elapsed_us;
}
if self.latencies_us.len() == LATENCY_RING_SIZE {
self.latencies_us.pop_front();
}
self.latencies_us.push_back(elapsed_us);
}
fn percentiles(&self) -> (u32, u32, u32) {
if self.latencies_us.is_empty() {
return (0, 0, 0);
}
let mut sorted: Vec<u32> = self.latencies_us.iter().copied().collect();
sorted.sort_unstable();
let n = sorted.len();
let pick = |p: u32| -> u32 {
let idx = ((u64::from(p) * n as u64).div_ceil(100) as usize).saturating_sub(1);
sorted[idx.min(n - 1)]
};
(pick(50), pick(95), pick(99))
}
}
struct Metrics {
started_at_secs: u64,
started_instant: Instant,
per_command: HashMap<&'static str, CommandStats>,
}
impl Metrics {
fn new() -> Self {
Self {
started_at_secs: SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0),
started_instant: Instant::now(),
per_command: HashMap::new(),
}
}
fn record(&mut self, command_kind: &'static str, elapsed_us: u32, is_error: bool) {
self.per_command
.entry(command_kind)
.or_insert_with(CommandStats::new)
.record(elapsed_us, is_error);
}
fn snapshot(&self) -> MetricsSnapshot {
let mut commands: Vec<CommandSnapshot> = self
.per_command
.iter()
.map(|(name, stats)| {
let (p50, p95, p99) = stats.percentiles();
let mean_us = stats.latency_sum_us.checked_div(stats.count).unwrap_or(0) as u32;
CommandSnapshot {
name,
count: stats.count,
error_count: stats.error_count,
mean_us,
p50_us: p50,
p95_us: p95,
p99_us: p99,
max_us: stats.latency_max_us,
}
})
.collect();
commands.sort_by(|a, b| b.count.cmp(&a.count).then_with(|| a.name.cmp(b.name)));
let total_calls: u64 = self.per_command.values().map(|s| s.count).sum();
let total_errors: u64 = self.per_command.values().map(|s| s.error_count).sum();
MetricsSnapshot {
version: SNAPSHOT_VERSION,
uptime_secs: self.started_instant.elapsed().as_secs(),
started_at_secs: self.started_at_secs,
total_calls,
total_errors,
commands,
}
}
}
pub const SNAPSHOT_VERSION: u32 = 1;
#[derive(Debug, Serialize)]
pub struct MetricsSnapshot {
pub version: u32,
pub uptime_secs: u64,
pub started_at_secs: u64,
pub total_calls: u64,
pub total_errors: u64,
pub commands: Vec<CommandSnapshot>,
}
#[derive(Debug, Serialize)]
pub struct CommandSnapshot {
pub name: &'static str,
pub count: u64,
pub error_count: u64,
pub mean_us: u32,
pub p50_us: u32,
pub p95_us: u32,
pub p99_us: u32,
pub max_us: u32,
}
#[cfg(test)]
mod tests {
use super::*;
fn fresh_metrics() -> Metrics {
Metrics::new()
}
#[test]
fn record_increments_count_and_tracks_latency() {
let mut m = fresh_metrics();
m.record("ping", 100, false);
m.record("ping", 200, false);
m.record("ping", 300, true);
let snap = m.snapshot();
let ping = snap
.commands
.iter()
.find(|c| c.name == "ping")
.expect("ping row present");
assert_eq!(ping.count, 3);
assert_eq!(ping.error_count, 1);
assert_eq!(ping.max_us, 300);
assert_eq!(ping.mean_us, 200);
}
#[test]
fn percentiles_with_uniform_distribution() {
let mut m = fresh_metrics();
for i in 1..=100u32 {
m.record("mem_get", i * 10, false);
}
let snap = m.snapshot();
let mem_get = snap
.commands
.iter()
.find(|c| c.name == "mem_get")
.expect("mem_get row present");
assert_eq!(mem_get.count, 100);
assert_eq!(mem_get.p50_us, 500);
assert_eq!(mem_get.p95_us, 950);
assert_eq!(mem_get.p99_us, 990);
assert_eq!(mem_get.max_us, 1000);
}
#[test]
fn ring_evicts_oldest_above_capacity() {
let mut m = fresh_metrics();
for i in 0..(LATENCY_RING_SIZE * 2) as u32 {
m.record("mem_query", i + 1, false);
}
let snap = m.snapshot();
let mq = snap
.commands
.iter()
.find(|c| c.name == "mem_query")
.unwrap();
assert_eq!(mq.count, (LATENCY_RING_SIZE * 2) as u64);
let expected_p50 = (LATENCY_RING_SIZE + LATENCY_RING_SIZE / 2) as u32;
assert_eq!(mq.p50_us, expected_p50);
}
#[test]
fn snapshot_is_ordered_by_count_then_name() {
let mut m = fresh_metrics();
for _ in 0..5 {
m.record("ping", 10, false);
}
for _ in 0..10 {
m.record("mem_get", 20, false);
}
for _ in 0..10 {
m.record("get", 15, false);
}
let snap = m.snapshot();
assert_eq!(snap.commands[0].name, "get"); assert_eq!(snap.commands[1].name, "mem_get");
assert_eq!(snap.commands[2].name, "ping");
}
#[test]
fn empty_metrics_yields_empty_snapshot() {
let m = fresh_metrics();
let snap = m.snapshot();
assert_eq!(snap.total_calls, 0);
assert_eq!(snap.total_errors, 0);
assert!(snap.commands.is_empty());
}
}