rangebar_streaming/
ring_buffer.rs1use std::sync::Arc;
13use parking_lot::Mutex;
14use std::sync::atomic::{AtomicU64, Ordering};
15
16#[derive(Clone, Debug)]
18pub struct RingBufferSlot<T: Clone> {
19 value: Option<T>,
20}
21
22pub struct RingBuffer<T: Clone> {
24 buffer: Vec<RingBufferSlot<T>>,
25 write_idx: usize, read_idx: usize, count: usize, metrics: Arc<RingBufferMetrics>,
30}
31
32#[derive(Debug, Default)]
34pub struct RingBufferMetrics {
35 pub total_pushed: AtomicU64,
37 pub total_dropped: AtomicU64,
39 pub total_popped: AtomicU64,
41 pub max_depth: AtomicU64,
43}
44
45impl<T: Clone> RingBuffer<T> {
46 pub fn new(capacity: usize) -> Self {
48 let buffer = vec![
49 RingBufferSlot { value: None };
50 capacity.max(1)
51 ];
52 Self {
53 buffer,
54 write_idx: 0,
55 read_idx: 0,
56 count: 0,
57 metrics: Arc::new(RingBufferMetrics::default()),
58 }
59 }
60
61 pub fn push(&mut self, item: T) -> bool {
64 let was_full = self.is_full();
65
66 if was_full {
68 self.buffer[self.read_idx].value = None;
69 self.read_idx = (self.read_idx + 1) % self.buffer.len();
70 self.count = self.count.saturating_sub(1);
71 self.metrics.total_dropped.fetch_add(1, Ordering::Relaxed);
72 }
73
74 self.buffer[self.write_idx].value = Some(item);
76 self.write_idx = (self.write_idx + 1) % self.buffer.len();
77 self.count += 1;
78
79 self.metrics.total_pushed.fetch_add(1, Ordering::Relaxed);
81 let current_depth = self.count as u64;
82 let _ = self.metrics.max_depth.fetch_max(current_depth, Ordering::Relaxed);
83
84 !was_full
85 }
86
87 pub fn pop(&mut self) -> Option<T> {
89 if self.count == 0 {
90 return None;
91 }
92
93 let slot = &mut self.buffer[self.read_idx];
94 let item = slot.value.take();
95 self.read_idx = (self.read_idx + 1) % self.buffer.len();
96 self.count = self.count.saturating_sub(1);
97
98 if item.is_some() {
99 self.metrics.total_popped.fetch_add(1, Ordering::Relaxed);
100 }
101
102 item
103 }
104
105 pub fn is_empty(&self) -> bool {
107 self.count == 0
108 }
109
110 pub fn is_full(&self) -> bool {
112 self.count >= self.buffer.len()
113 }
114
115 pub fn len(&self) -> usize {
117 self.count
118 }
119
120 pub fn capacity(&self) -> usize {
122 self.buffer.len()
123 }
124
125 pub fn metrics(&self) -> Arc<RingBufferMetrics> {
127 Arc::clone(&self.metrics)
128 }
129
130 pub fn clear(&mut self) {
132 for slot in &mut self.buffer {
133 slot.value = None;
134 }
135 self.write_idx = 0;
136 self.read_idx = 0;
137 self.count = 0;
138 }
139}
140
141pub struct ConcurrentRingBuffer<T: Clone + Send + 'static> {
143 inner: Arc<Mutex<RingBuffer<T>>>,
144}
145
146impl<T: Clone + Send + 'static> ConcurrentRingBuffer<T> {
147 pub fn new(capacity: usize) -> Self {
149 Self {
150 inner: Arc::new(Mutex::new(RingBuffer::new(capacity))),
151 }
152 }
153
154 pub fn push(&self, item: T) -> bool {
156 let mut buf = self.inner.lock();
157 buf.push(item)
158 }
159
160 pub fn pop(&self) -> Option<T> {
162 let mut buf = self.inner.lock();
163 buf.pop()
164 }
165
166 pub fn len(&self) -> usize {
168 self.inner.lock().len()
169 }
170
171 pub fn capacity(&self) -> usize {
173 self.inner.lock().capacity()
174 }
175
176 pub fn metrics(&self) -> Arc<RingBufferMetrics> {
178 self.inner.lock().metrics()
179 }
180
181 pub fn clone_ref(&self) -> Self {
183 Self {
184 inner: Arc::clone(&self.inner),
185 }
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192
193 #[test]
194 fn test_ring_buffer_basic() {
195 let mut buf = RingBuffer::new(3);
196 assert!(buf.is_empty());
197 assert!(!buf.is_full());
198
199 assert!(buf.push(1));
200 assert!(!buf.is_empty());
201 assert!(!buf.is_full());
202
203 assert!(buf.push(2));
204 assert!(buf.push(3));
205 assert!(buf.is_full());
206
207 assert!(!buf.push(4));
209 assert_eq!(buf.len(), 3);
210 assert_eq!(buf.metrics().total_dropped.load(Ordering::Relaxed), 1);
211
212 assert_eq!(buf.pop(), Some(2));
214 assert_eq!(buf.pop(), Some(3));
215 assert_eq!(buf.pop(), Some(4));
216 assert_eq!(buf.pop(), None);
217 }
218
219 #[test]
220 fn test_ring_buffer_metrics() {
221 let mut buf = RingBuffer::new(2);
222
223 buf.push(1);
224 buf.push(2);
225 assert_eq!(buf.metrics().total_pushed.load(Ordering::Relaxed), 2);
226 assert_eq!(buf.metrics().max_depth.load(Ordering::Relaxed), 2);
227
228 buf.push(3); assert_eq!(buf.metrics().total_dropped.load(Ordering::Relaxed), 1);
230
231 buf.pop(); buf.pop(); buf.pop(); assert_eq!(buf.metrics().total_popped.load(Ordering::Relaxed), 2);
236 }
237
238 #[test]
239 fn test_concurrent_ring_buffer() {
240 let buf = ConcurrentRingBuffer::new(2);
241 assert!(buf.push(1));
242 assert!(buf.push(2));
243 assert!(!buf.push(3)); assert_eq!(buf.pop(), Some(2));
246 assert_eq!(buf.len(), 1);
247 }
248}