qrusty 0.20.7

A trusty priority queue server built with Rust
Documentation
// src/operation_timing.rs

//! # Operation Timing Store
//!
//! Implements: SYS-0017, API-0013
//!
//! Per-operation timing metrics with per-second bucket aggregation and a
//! fixed-size ring buffer retaining up to 1 hour of history.  All access is
//! via a `std::sync::Mutex` so that recording a sample is cheap and never
//! requires an async context.

use serde::Serialize;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

/// Maximum number of seconds of history retained per operation.
const MAX_HISTORY_SECS: usize = 3600;

/// Default number of seconds returned by the `/operation-timings` endpoint.
pub const DEFAULT_WINDOW_SECS: u64 = 300;

/// A single per-second timing bucket.
#[derive(Debug, Clone, Copy, Serialize)]
pub struct TimingBucket {
    /// Unix epoch second for this bucket.
    pub t_epoch_s: u64,
    /// Number of operations completed in this second.
    pub count: u64,
    /// Sum of durations in microseconds.
    pub total_us: u64,
    /// Minimum duration in microseconds (0 when count == 0).
    pub min_us: u64,
    /// Maximum duration in microseconds (0 when count == 0).
    pub max_us: u64,
}

impl TimingBucket {
    fn empty(t_epoch_s: u64) -> Self {
        Self {
            t_epoch_s,
            count: 0,
            total_us: 0,
            min_us: 0,
            max_us: 0,
        }
    }
}

/// Ring buffer of per-second timing buckets for a single operation.
struct OperationRing {
    /// Fixed-size ring buffer.  Index = epoch_second % capacity.
    buckets: Vec<TimingBucket>,
}

impl OperationRing {
    fn new() -> Self {
        Self {
            buckets: Vec::new(),
        }
    }

    /// Record a single timing sample at the given epoch second.
    fn record(&mut self, epoch_s: u64, duration_us: u64) {
        if self.buckets.is_empty() {
            // First sample — initialise ring.
            self.buckets
                .resize_with(MAX_HISTORY_SECS, || TimingBucket::empty(0));
            let idx = (epoch_s as usize) % MAX_HISTORY_SECS;
            self.buckets[idx] = TimingBucket {
                t_epoch_s: epoch_s,
                count: 1,
                total_us: duration_us,
                min_us: duration_us,
                max_us: duration_us,
            };
            return;
        }

        let idx = (epoch_s as usize) % MAX_HISTORY_SECS;
        let bucket = &mut self.buckets[idx];

        if bucket.t_epoch_s == epoch_s {
            // Same second — accumulate.
            bucket.count += 1;
            bucket.total_us += duration_us;
            if duration_us < bucket.min_us {
                bucket.min_us = duration_us;
            }
            if duration_us > bucket.max_us {
                bucket.max_us = duration_us;
            }
        } else {
            // New second in this slot — reset.
            *bucket = TimingBucket {
                t_epoch_s: epoch_s,
                count: 1,
                total_us: duration_us,
                min_us: duration_us,
                max_us: duration_us,
            };
        }
    }

    /// Return buckets within `[now_s - seconds .. now_s]` inclusive, filling
    /// gaps with empty buckets so consumers see every second.
    fn query(&self, now_s: u64, seconds: u64) -> Vec<TimingBucket> {
        if self.buckets.is_empty() {
            let start = now_s.saturating_sub(seconds.saturating_sub(1));
            return (start..=now_s).map(TimingBucket::empty).collect();
        }

        let start = now_s.saturating_sub(seconds.saturating_sub(1));
        let mut result = Vec::with_capacity(seconds as usize);

        for t in start..=now_s {
            let idx = (t as usize) % MAX_HISTORY_SECS;
            let bucket = &self.buckets[idx];
            if bucket.t_epoch_s == t {
                result.push(*bucket);
            } else {
                result.push(TimingBucket::empty(t));
            }
        }

        result
    }
}

/// Thread-safe store of per-operation timing metrics.
///
/// Each operation (e.g. "publish", "storage_push") has its own ring buffer
/// of per-second aggregated buckets.
pub struct OperationTimingStore {
    rings: Mutex<HashMap<String, OperationRing>>,
}

impl OperationTimingStore {
    pub fn new() -> Self {
        Self {
            rings: Mutex::new(HashMap::new()),
        }
    }

    /// Record an operation timing sample.
    ///
    /// `op` is the operation name (e.g. "publish", "storage_push").
    /// `duration` is how long the operation took.
    pub fn record(&self, op: &str, duration: Duration) {
        let epoch_s = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        self.record_at(op, epoch_s, duration);
    }

    /// Record with an explicit epoch second (for testing).
    pub fn record_at(&self, op: &str, epoch_s: u64, duration: Duration) {
        let duration_us = duration.as_micros() as u64;
        let mut rings = self.rings.lock().expect("timing mutex poisoned");
        rings
            .entry(op.to_string())
            .or_insert_with(OperationRing::new)
            .record(epoch_s, duration_us);
    }

    /// Query timing data for all operations.
    ///
    /// Returns a map of operation name → vec of per-second buckets covering
    /// the last `seconds` seconds (clamped to MAX_HISTORY_SECS).
    pub fn query_all(&self, seconds: u64) -> HashMap<String, Vec<TimingBucket>> {
        let now_s = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap_or_default()
            .as_secs();
        self.query_all_at(now_s, seconds)
    }

    /// Query with an explicit "now" epoch second (for testing).
    pub fn query_all_at(&self, now_s: u64, seconds: u64) -> HashMap<String, Vec<TimingBucket>> {
        let seconds = seconds.min(MAX_HISTORY_SECS as u64).max(1);
        let rings = self.rings.lock().expect("timing mutex poisoned");
        rings
            .iter()
            .map(|(op, ring)| (op.clone(), ring.query(now_s, seconds)))
            .collect()
    }
}

impl Default for OperationTimingStore {
    fn default() -> Self {
        Self::new()
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    // Verifies: SYS-0017 — per-second bucket aggregation
    #[test]
    fn single_sample_creates_bucket() {
        let store = OperationTimingStore::new();
        store.record_at("publish", 1000, Duration::from_micros(500));

        let result = store.query_all_at(1000, 1);
        let buckets = &result["publish"];
        assert_eq!(buckets.len(), 1);
        assert_eq!(buckets[0].t_epoch_s, 1000);
        assert_eq!(buckets[0].count, 1);
        assert_eq!(buckets[0].total_us, 500);
        assert_eq!(buckets[0].min_us, 500);
        assert_eq!(buckets[0].max_us, 500);
    }

    // Verifies: SYS-0017 — multiple samples aggregate in same bucket
    #[test]
    fn multiple_samples_same_second_aggregate() {
        let store = OperationTimingStore::new();
        store.record_at("consume", 2000, Duration::from_micros(100));
        store.record_at("consume", 2000, Duration::from_micros(300));
        store.record_at("consume", 2000, Duration::from_micros(200));

        let result = store.query_all_at(2000, 1);
        let b = &result["consume"][0];
        assert_eq!(b.count, 3);
        assert_eq!(b.total_us, 600);
        assert_eq!(b.min_us, 100);
        assert_eq!(b.max_us, 300);
    }

    // Verifies: SYS-0017 — different operations tracked independently
    #[test]
    fn different_operations_independent() {
        let store = OperationTimingStore::new();
        store.record_at("publish", 3000, Duration::from_micros(10));
        store.record_at("consume", 3000, Duration::from_micros(20));

        let result = store.query_all_at(3000, 1);
        assert_eq!(result["publish"][0].total_us, 10);
        assert_eq!(result["consume"][0].total_us, 20);
    }

    // Verifies: API-0013 — empty buckets included in query window
    #[test]
    fn query_fills_gaps_with_empty_buckets() {
        let store = OperationTimingStore::new();
        store.record_at("ack", 1000, Duration::from_micros(50));
        store.record_at("ack", 1004, Duration::from_micros(75));

        let result = store.query_all_at(1004, 5);
        let buckets = &result["ack"];
        assert_eq!(buckets.len(), 5);

        // t=1000: has data
        assert_eq!(buckets[0].t_epoch_s, 1000);
        assert_eq!(buckets[0].count, 1);

        // t=1001..1003: empty
        for (i, bucket) in buckets.iter().enumerate().take(4).skip(1) {
            assert_eq!(bucket.t_epoch_s, 1000 + i as u64);
            assert_eq!(bucket.count, 0);
            assert_eq!(bucket.min_us, 0);
            assert_eq!(bucket.max_us, 0);
        }

        // t=1004: has data
        assert_eq!(buckets[4].t_epoch_s, 1004);
        assert_eq!(buckets[4].count, 1);
    }

    // Verifies: SYS-0017 — ring buffer wraps and evicts old data
    #[test]
    fn ring_buffer_wraps_after_max_history() {
        let store = OperationTimingStore::new();

        // Record at t=0
        store.record_at("op", 0, Duration::from_micros(10));
        // Record at t=MAX_HISTORY_SECS (same slot, should overwrite)
        store.record_at("op", MAX_HISTORY_SECS as u64, Duration::from_micros(20));

        let result = store.query_all_at(MAX_HISTORY_SECS as u64, 1);
        let b = &result["op"][0];
        assert_eq!(b.t_epoch_s, MAX_HISTORY_SECS as u64);
        assert_eq!(b.total_us, 20);
        assert_eq!(b.count, 1);

        // Old data at t=0 is gone — querying at t=0 would show the overwritten slot
    }

    // Verifies: API-0013 — seconds clamped to MAX_HISTORY_SECS
    #[test]
    fn query_clamps_to_max_history() {
        let store = OperationTimingStore::new();
        store.record_at("op", 5000, Duration::from_micros(1));

        // Request more than max
        let result = store.query_all_at(5000, 99999);
        let buckets = &result["op"];
        assert_eq!(buckets.len(), MAX_HISTORY_SECS);
    }

    // Verifies: API-0013 — empty store returns empty buckets
    #[test]
    fn query_empty_store_returns_no_operations() {
        let store = OperationTimingStore::new();
        let result = store.query_all_at(1000, 10);
        assert!(result.is_empty());
    }

    // Verifies: API-0013 — query for operation with no data in window
    #[test]
    fn query_returns_empty_buckets_for_stale_data() {
        let store = OperationTimingStore::new();
        store.record_at("old_op", 100, Duration::from_micros(5));

        // Query at a much later time — the old data is out of the window
        let result = store.query_all_at(5000, 10);
        let buckets = &result["old_op"];
        assert_eq!(buckets.len(), 10);
        assert!(buckets.iter().all(|b| b.count == 0));
    }

    // Verifies: SYS-0017 — concurrent access safety
    #[test]
    fn concurrent_recording_is_safe() {
        use std::sync::Arc;
        use std::thread;

        let store = Arc::new(OperationTimingStore::new());
        let mut handles = vec![];

        for i in 0..10 {
            let store = Arc::clone(&store);
            handles.push(thread::spawn(move || {
                for j in 0..100 {
                    store.record_at(
                        "concurrent_op",
                        1000 + (j % 5),
                        Duration::from_micros(i * 10 + j),
                    );
                }
            }));
        }

        for h in handles {
            h.join().unwrap();
        }

        let result = store.query_all_at(1004, 5);
        let buckets = &result["concurrent_op"];
        let total_count: u64 = buckets.iter().map(|b| b.count).sum();
        assert_eq!(total_count, 1000); // 10 threads * 100 samples
    }

    // Verifies: SYS-0017 — bucket minimum tracked correctly across updates
    #[test]
    fn min_tracked_correctly() {
        let store = OperationTimingStore::new();
        store.record_at("op", 100, Duration::from_micros(500));
        store.record_at("op", 100, Duration::from_micros(100));
        store.record_at("op", 100, Duration::from_micros(300));

        let result = store.query_all_at(100, 1);
        assert_eq!(result["op"][0].min_us, 100);
        assert_eq!(result["op"][0].max_us, 500);
    }
}