Skip to main content

rangebar_streaming/
ring_buffer.rs

1//! Fixed-size ring buffer for streaming range bars (Issue #96 Task #9)
2//!
3//! Replaces unbounded Vec with circular buffer to prevent OOM in long-running sidecars.
4//! When full, drops old bars gracefully instead of blocking producers.
5//!
6//! Design:
7//! - Fixed capacity (default 10K bars = 5MB)
8//! - O(1) push/pop operations
9//! - Thread-safe via parking_lot::Mutex (Issue #89)
10//! - Metrics for backpressure, dropped bars, max depth
11
12use std::sync::Arc;
13use parking_lot::Mutex;
14use std::sync::atomic::{AtomicU64, Ordering};
15
16/// A single slot in the ring buffer
17#[derive(Clone, Debug)]
18pub struct RingBufferSlot<T: Clone> {
19    value: Option<T>,
20}
21
22/// Fixed-size ring buffer with metrics
23pub struct RingBuffer<T: Clone> {
24    buffer: Vec<RingBufferSlot<T>>,
25    write_idx: usize,  // Where next item will be written
26    read_idx: usize,   // Where next item will be read
27    count: usize,      // Number of items currently in buffer
28    /// Metrics accessible from outside
29    metrics: Arc<RingBufferMetrics>,
30}
31
32/// Metrics for ring buffer operations
33#[derive(Debug, Default)]
34pub struct RingBufferMetrics {
35    /// Number of items pushed
36    pub total_pushed: AtomicU64,
37    /// Number of items dropped (due to full buffer)
38    pub total_dropped: AtomicU64,
39    /// Number of items popped
40    pub total_popped: AtomicU64,
41    /// Maximum depth observed
42    pub max_depth: AtomicU64,
43}
44
45impl<T: Clone> RingBuffer<T> {
46    /// Create a new ring buffer with given capacity
47    pub fn new(capacity: usize) -> Self {
48        let buffer = vec![
49            RingBufferSlot { value: None };
50            capacity.max(1)
51        ];
52        Self {
53            buffer,
54            write_idx: 0,
55            read_idx: 0,
56            count: 0,
57            metrics: Arc::new(RingBufferMetrics::default()),
58        }
59    }
60
61    /// Push an item into the buffer
62    /// Returns true if item was added, false if buffer was full and old item was dropped
63    pub fn push(&mut self, item: T) -> bool {
64        let was_full = self.is_full();
65
66        // If full, drop old item
67        if was_full {
68            self.buffer[self.read_idx].value = None;
69            self.read_idx = (self.read_idx + 1) % self.buffer.len();
70            self.count = self.count.saturating_sub(1);
71            self.metrics.total_dropped.fetch_add(1, Ordering::Relaxed);
72        }
73
74        // Push new item
75        self.buffer[self.write_idx].value = Some(item);
76        self.write_idx = (self.write_idx + 1) % self.buffer.len();
77        self.count += 1;
78
79        // Update metrics
80        self.metrics.total_pushed.fetch_add(1, Ordering::Relaxed);
81        let current_depth = self.count as u64;
82        let _ = self.metrics.max_depth.fetch_max(current_depth, Ordering::Relaxed);
83
84        !was_full
85    }
86
87    /// Pop an item from the buffer
88    pub fn pop(&mut self) -> Option<T> {
89        if self.count == 0 {
90            return None;
91        }
92
93        let slot = &mut self.buffer[self.read_idx];
94        let item = slot.value.take();
95        self.read_idx = (self.read_idx + 1) % self.buffer.len();
96        self.count = self.count.saturating_sub(1);
97
98        if item.is_some() {
99            self.metrics.total_popped.fetch_add(1, Ordering::Relaxed);
100        }
101
102        item
103    }
104
105    /// Check if buffer is empty
106    pub fn is_empty(&self) -> bool {
107        self.count == 0
108    }
109
110    /// Check if buffer is full
111    pub fn is_full(&self) -> bool {
112        self.count >= self.buffer.len()
113    }
114
115    /// Get current count
116    pub fn len(&self) -> usize {
117        self.count
118    }
119
120    /// Get capacity
121    pub fn capacity(&self) -> usize {
122        self.buffer.len()
123    }
124
125    /// Get metrics reference
126    pub fn metrics(&self) -> Arc<RingBufferMetrics> {
127        Arc::clone(&self.metrics)
128    }
129
130    /// Clear all items
131    pub fn clear(&mut self) {
132        for slot in &mut self.buffer {
133            slot.value = None;
134        }
135        self.write_idx = 0;
136        self.read_idx = 0;
137        self.count = 0;
138    }
139}
140
141/// Thread-safe wrapper for RingBuffer
142pub struct ConcurrentRingBuffer<T: Clone + Send + 'static> {
143    inner: Arc<Mutex<RingBuffer<T>>>,
144}
145
146impl<T: Clone + Send + 'static> ConcurrentRingBuffer<T> {
147    /// Create a new concurrent ring buffer
148    pub fn new(capacity: usize) -> Self {
149        Self {
150            inner: Arc::new(Mutex::new(RingBuffer::new(capacity))),
151        }
152    }
153
154    /// Push an item (returns true if not dropped, false if dropped due to full)
155    pub fn push(&self, item: T) -> bool {
156        let mut buf = self.inner.lock();
157        buf.push(item)
158    }
159
160    /// Pop an item
161    pub fn pop(&self) -> Option<T> {
162        let mut buf = self.inner.lock();
163        buf.pop()
164    }
165
166    /// Get current length
167    pub fn len(&self) -> usize {
168        self.inner.lock().len()
169    }
170
171    /// Get capacity
172    pub fn capacity(&self) -> usize {
173        self.inner.lock().capacity()
174    }
175
176    /// Get metrics
177    pub fn metrics(&self) -> Arc<RingBufferMetrics> {
178        self.inner.lock().metrics()
179    }
180
181    /// Cloneable reference
182    pub fn clone_ref(&self) -> Self {
183        Self {
184            inner: Arc::clone(&self.inner),
185        }
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192
193    #[test]
194    fn test_ring_buffer_basic() {
195        let mut buf = RingBuffer::new(3);
196        assert!(buf.is_empty());
197        assert!(!buf.is_full());
198
199        assert!(buf.push(1));
200        assert!(!buf.is_empty());
201        assert!(!buf.is_full());
202
203        assert!(buf.push(2));
204        assert!(buf.push(3));
205        assert!(buf.is_full());
206
207        // Next push drops oldest
208        assert!(!buf.push(4));
209        assert_eq!(buf.len(), 3);
210        assert_eq!(buf.metrics().total_dropped.load(Ordering::Relaxed), 1);
211
212        // Pop returns items in order
213        assert_eq!(buf.pop(), Some(2));
214        assert_eq!(buf.pop(), Some(3));
215        assert_eq!(buf.pop(), Some(4));
216        assert_eq!(buf.pop(), None);
217    }
218
219    #[test]
220    fn test_ring_buffer_metrics() {
221        let mut buf = RingBuffer::new(2);
222
223        buf.push(1);
224        buf.push(2);
225        assert_eq!(buf.metrics().total_pushed.load(Ordering::Relaxed), 2);
226        assert_eq!(buf.metrics().max_depth.load(Ordering::Relaxed), 2);
227
228        buf.push(3);  // Drops 1
229        assert_eq!(buf.metrics().total_dropped.load(Ordering::Relaxed), 1);
230
231        buf.pop();
232        buf.pop();
233        buf.pop();
234        assert_eq!(buf.metrics().total_popped.load(Ordering::Relaxed), 3);
235    }
236
237    #[test]
238    fn test_concurrent_ring_buffer() {
239        let buf = ConcurrentRingBuffer::new(2);
240        assert!(buf.push(1));
241        assert!(buf.push(2));
242        assert!(!buf.push(3));  // Drops
243
244        assert_eq!(buf.pop(), Some(2));
245        assert_eq!(buf.len(), 1);
246    }
247}