use serde::Serialize;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
const MAX_HISTORY_SECS: usize = 3600;
pub const DEFAULT_WINDOW_SECS: u64 = 300;
#[derive(Debug, Clone, Copy, Serialize)]
pub struct TimingBucket {
pub t_epoch_s: u64,
pub count: u64,
pub total_us: u64,
pub min_us: u64,
pub max_us: u64,
}
impl TimingBucket {
fn empty(t_epoch_s: u64) -> Self {
Self {
t_epoch_s,
count: 0,
total_us: 0,
min_us: 0,
max_us: 0,
}
}
}
struct OperationRing {
buckets: Vec<TimingBucket>,
}
impl OperationRing {
fn new() -> Self {
Self {
buckets: Vec::new(),
}
}
fn record(&mut self, epoch_s: u64, duration_us: u64) {
if self.buckets.is_empty() {
self.buckets
.resize_with(MAX_HISTORY_SECS, || TimingBucket::empty(0));
let idx = (epoch_s as usize) % MAX_HISTORY_SECS;
self.buckets[idx] = TimingBucket {
t_epoch_s: epoch_s,
count: 1,
total_us: duration_us,
min_us: duration_us,
max_us: duration_us,
};
return;
}
let idx = (epoch_s as usize) % MAX_HISTORY_SECS;
let bucket = &mut self.buckets[idx];
if bucket.t_epoch_s == epoch_s {
bucket.count += 1;
bucket.total_us += duration_us;
if duration_us < bucket.min_us {
bucket.min_us = duration_us;
}
if duration_us > bucket.max_us {
bucket.max_us = duration_us;
}
} else {
*bucket = TimingBucket {
t_epoch_s: epoch_s,
count: 1,
total_us: duration_us,
min_us: duration_us,
max_us: duration_us,
};
}
}
fn query(&self, now_s: u64, seconds: u64) -> Vec<TimingBucket> {
if self.buckets.is_empty() {
let start = now_s.saturating_sub(seconds.saturating_sub(1));
return (start..=now_s).map(TimingBucket::empty).collect();
}
let start = now_s.saturating_sub(seconds.saturating_sub(1));
let mut result = Vec::with_capacity(seconds as usize);
for t in start..=now_s {
let idx = (t as usize) % MAX_HISTORY_SECS;
let bucket = &self.buckets[idx];
if bucket.t_epoch_s == t {
result.push(*bucket);
} else {
result.push(TimingBucket::empty(t));
}
}
result
}
}
pub struct OperationTimingStore {
rings: Mutex<HashMap<String, OperationRing>>,
}
impl OperationTimingStore {
pub fn new() -> Self {
Self {
rings: Mutex::new(HashMap::new()),
}
}
pub fn record(&self, op: &str, duration: Duration) {
let epoch_s = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.record_at(op, epoch_s, duration);
}
pub fn record_at(&self, op: &str, epoch_s: u64, duration: Duration) {
let duration_us = duration.as_micros() as u64;
let mut rings = self.rings.lock().expect("timing mutex poisoned");
rings
.entry(op.to_string())
.or_insert_with(OperationRing::new)
.record(epoch_s, duration_us);
}
pub fn query_all(&self, seconds: u64) -> HashMap<String, Vec<TimingBucket>> {
let now_s = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
self.query_all_at(now_s, seconds)
}
pub fn query_all_at(&self, now_s: u64, seconds: u64) -> HashMap<String, Vec<TimingBucket>> {
let seconds = seconds.min(MAX_HISTORY_SECS as u64).max(1);
let rings = self.rings.lock().expect("timing mutex poisoned");
rings
.iter()
.map(|(op, ring)| (op.clone(), ring.query(now_s, seconds)))
.collect()
}
}
impl Default for OperationTimingStore {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn single_sample_creates_bucket() {
let store = OperationTimingStore::new();
store.record_at("publish", 1000, Duration::from_micros(500));
let result = store.query_all_at(1000, 1);
let buckets = &result["publish"];
assert_eq!(buckets.len(), 1);
assert_eq!(buckets[0].t_epoch_s, 1000);
assert_eq!(buckets[0].count, 1);
assert_eq!(buckets[0].total_us, 500);
assert_eq!(buckets[0].min_us, 500);
assert_eq!(buckets[0].max_us, 500);
}
#[test]
fn multiple_samples_same_second_aggregate() {
let store = OperationTimingStore::new();
store.record_at("consume", 2000, Duration::from_micros(100));
store.record_at("consume", 2000, Duration::from_micros(300));
store.record_at("consume", 2000, Duration::from_micros(200));
let result = store.query_all_at(2000, 1);
let b = &result["consume"][0];
assert_eq!(b.count, 3);
assert_eq!(b.total_us, 600);
assert_eq!(b.min_us, 100);
assert_eq!(b.max_us, 300);
}
#[test]
fn different_operations_independent() {
let store = OperationTimingStore::new();
store.record_at("publish", 3000, Duration::from_micros(10));
store.record_at("consume", 3000, Duration::from_micros(20));
let result = store.query_all_at(3000, 1);
assert_eq!(result["publish"][0].total_us, 10);
assert_eq!(result["consume"][0].total_us, 20);
}
#[test]
fn query_fills_gaps_with_empty_buckets() {
let store = OperationTimingStore::new();
store.record_at("ack", 1000, Duration::from_micros(50));
store.record_at("ack", 1004, Duration::from_micros(75));
let result = store.query_all_at(1004, 5);
let buckets = &result["ack"];
assert_eq!(buckets.len(), 5);
assert_eq!(buckets[0].t_epoch_s, 1000);
assert_eq!(buckets[0].count, 1);
for (i, bucket) in buckets.iter().enumerate().take(4).skip(1) {
assert_eq!(bucket.t_epoch_s, 1000 + i as u64);
assert_eq!(bucket.count, 0);
assert_eq!(bucket.min_us, 0);
assert_eq!(bucket.max_us, 0);
}
assert_eq!(buckets[4].t_epoch_s, 1004);
assert_eq!(buckets[4].count, 1);
}
#[test]
fn ring_buffer_wraps_after_max_history() {
let store = OperationTimingStore::new();
store.record_at("op", 0, Duration::from_micros(10));
store.record_at("op", MAX_HISTORY_SECS as u64, Duration::from_micros(20));
let result = store.query_all_at(MAX_HISTORY_SECS as u64, 1);
let b = &result["op"][0];
assert_eq!(b.t_epoch_s, MAX_HISTORY_SECS as u64);
assert_eq!(b.total_us, 20);
assert_eq!(b.count, 1);
}
#[test]
fn query_clamps_to_max_history() {
let store = OperationTimingStore::new();
store.record_at("op", 5000, Duration::from_micros(1));
let result = store.query_all_at(5000, 99999);
let buckets = &result["op"];
assert_eq!(buckets.len(), MAX_HISTORY_SECS);
}
#[test]
fn query_empty_store_returns_no_operations() {
let store = OperationTimingStore::new();
let result = store.query_all_at(1000, 10);
assert!(result.is_empty());
}
#[test]
fn query_returns_empty_buckets_for_stale_data() {
let store = OperationTimingStore::new();
store.record_at("old_op", 100, Duration::from_micros(5));
let result = store.query_all_at(5000, 10);
let buckets = &result["old_op"];
assert_eq!(buckets.len(), 10);
assert!(buckets.iter().all(|b| b.count == 0));
}
#[test]
fn concurrent_recording_is_safe() {
use std::sync::Arc;
use std::thread;
let store = Arc::new(OperationTimingStore::new());
let mut handles = vec![];
for i in 0..10 {
let store = Arc::clone(&store);
handles.push(thread::spawn(move || {
for j in 0..100 {
store.record_at(
"concurrent_op",
1000 + (j % 5),
Duration::from_micros(i * 10 + j),
);
}
}));
}
for h in handles {
h.join().unwrap();
}
let result = store.query_all_at(1004, 5);
let buckets = &result["concurrent_op"];
let total_count: u64 = buckets.iter().map(|b| b.count).sum();
assert_eq!(total_count, 1000); }
#[test]
fn min_tracked_correctly() {
let store = OperationTimingStore::new();
store.record_at("op", 100, Duration::from_micros(500));
store.record_at("op", 100, Duration::from_micros(100));
store.record_at("op", 100, Duration::from_micros(300));
let result = store.query_all_at(100, 1);
assert_eq!(result["op"][0].min_us, 100);
assert_eq!(result["op"][0].max_us, 500);
}
}