Skip to main content

netpulse/stats/
ring_buffer.rs

1// src/stats/ring_buffer.rs — Fixed-size circular buffer
2//
3// The RingBuffer stores the N most recent ProbeResults per target.
4// Older samples are automatically overwritten when the buffer is full.
5//
6// Wrapped in Arc<Mutex<RingBuffer>> for sharing between the prober
7// task and the stats engine. This is the first step — later you could
8// upgrade to crossbeam's lock-free SegQueue for even lower contention.
9
10use crate::probers::ProbeResult;
11use std::collections::VecDeque;
12
13/// A fixed-capacity circular buffer of ProbeResults.
14///
15/// When the buffer is full, the oldest entry is evicted.
16pub struct RingBuffer {
17    capacity: usize,
18    inner: VecDeque<ProbeResult>,
19}
20
21impl RingBuffer {
22    /// Create a new ring buffer with the given capacity.
23    pub fn new(capacity: usize) -> Self {
24        assert!(capacity > 0, "RingBuffer capacity must be > 0");
25        Self {
26            capacity,
27            inner: VecDeque::with_capacity(capacity),
28        }
29    }
30
31    /// Push a new probe result, evicting the oldest if at capacity.
32    pub fn push(&mut self, result: ProbeResult) {
33        if self.inner.len() == self.capacity {
34            self.inner.pop_front();
35        }
36        self.inner.push_back(result);
37    }
38
39    /// Return a snapshot of all current samples as a Vec.
40    /// This is a clone — callers get their own data and the lock is
41    /// released as soon as the borrow ends.
42    pub fn snapshot(&self) -> Vec<ProbeResult> {
43        self.inner.iter().cloned().collect()
44    }
45
46    /// How many samples are currently stored.
47    pub fn len(&self) -> usize {
48        self.inner.len()
49    }
50
51    /// True if the buffer contains no samples.
52    pub fn is_empty(&self) -> bool {
53        self.inner.is_empty()
54    }
55
56    /// Current capacity.
57    pub fn capacity(&self) -> usize {
58        self.capacity
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65    use chrono::Utc;
66
67    fn make_result(rtt_us: Option<u64>, seq: u64) -> ProbeResult {
68        ProbeResult {
69            target: "test".to_string(),
70            rtt_us,
71            timestamp: Utc::now(),
72            seq,
73            responder_ip: None,
74        }
75    }
76
77    #[test]
78    fn test_capacity_eviction() {
79        let mut rb = RingBuffer::new(3);
80        for i in 0..5u64 {
81            rb.push(make_result(Some(i * 1000), i));
82        }
83        // Should only hold the last 3
84        assert_eq!(rb.len(), 3);
85        let snap = rb.snapshot();
86        assert_eq!(snap[0].seq, 2);
87        assert_eq!(snap[1].seq, 3);
88        assert_eq!(snap[2].seq, 4);
89    }
90
91    #[test]
92    fn test_empty() {
93        let rb = RingBuffer::new(10);
94        assert!(rb.is_empty());
95        assert_eq!(rb.len(), 0);
96    }
97
98    #[test]
99    fn test_snapshot_is_clone() {
100        let mut rb = RingBuffer::new(5);
101        rb.push(make_result(Some(1000), 0));
102        let snap = rb.snapshot();
103        // Mutating original doesn't affect snapshot
104        rb.push(make_result(Some(2000), 1));
105        assert_eq!(snap.len(), 1);
106    }
107}