1use std::collections::VecDeque;
26use std::sync::Mutex;
27
28const DEFAULT_CAPACITY: usize = 64 * 1024;
34
35const FRAME_OVERHEAD: usize = 4;
37
38struct Inner {
44 frames: VecDeque<Vec<u8>>,
46 bytes_used: usize,
48 capacity: usize,
50}
51
52impl Inner {
53 fn new(capacity: usize) -> Self {
55 Self {
56 frames: VecDeque::new(),
57 bytes_used: 0,
58 capacity,
59 }
60 }
61
62 #[inline]
64 fn frame_cost(frame: &[u8]) -> usize {
65 FRAME_OVERHEAD + frame.len()
66 }
67
68 fn drop_oldest(&mut self) -> bool {
71 if let Some(old) = self.frames.pop_front() {
72 self.bytes_used -= Self::frame_cost(&old);
73 true
74 } else {
75 false
76 }
77 }
78}
79
80pub struct ConduitRingBuffer {
96 inner: Mutex<Inner>,
97}
98
99impl ConduitRingBuffer {
100 pub fn new(capacity: usize) -> Self {
107 assert!(
108 capacity >= FRAME_OVERHEAD,
109 "capacity must be at least {FRAME_OVERHEAD} bytes"
110 );
111 Self {
112 inner: Mutex::new(Inner::new(capacity)),
113 }
114 }
115
116 pub fn with_default_capacity() -> Self {
118 Self::new(DEFAULT_CAPACITY)
119 }
120
121 #[must_use]
130 pub fn push(&self, frame: &[u8]) -> usize {
131 let cost = Inner::frame_cost(frame);
132 let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
133
134 if cost > inner.capacity {
136 return 0;
137 }
138
139 let mut dropped = 0usize;
140 while inner.bytes_used + cost > inner.capacity {
141 if !inner.drop_oldest() {
142 break;
143 }
144 dropped += 1;
145 }
146
147 inner.frames.push_back(frame.to_vec());
148 inner.bytes_used += cost;
149 dropped
150 }
151
152 #[must_use]
166 pub fn drain_all(&self) -> Vec<u8> {
167 let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
168
169 if inner.frames.is_empty() {
170 return Vec::new();
171 }
172
173 let output_size = 4 + inner.bytes_used;
175 let mut buf = Vec::with_capacity(output_size);
176
177 let count = inner.frames.len() as u32;
179 buf.extend_from_slice(&count.to_le_bytes());
180
181 for frame in &inner.frames {
183 let len = frame.len() as u32;
184 buf.extend_from_slice(&len.to_le_bytes());
185 buf.extend_from_slice(frame);
186 }
187
188 inner.frames.clear();
190 inner.bytes_used = 0;
191
192 buf
193 }
194
195 #[must_use]
199 pub fn try_pop(&self) -> Option<Vec<u8>> {
200 let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
201 let frame = inner.frames.pop_front()?;
202 inner.bytes_used -= Inner::frame_cost(&frame);
203 Some(frame)
204 }
205
206 #[must_use]
208 pub fn frame_count(&self) -> usize {
209 self.inner
210 .lock()
211 .expect("ring buffer lock poisoned")
212 .frames
213 .len()
214 }
215
216 #[must_use]
218 pub fn bytes_used(&self) -> usize {
219 self.inner
220 .lock()
221 .expect("ring buffer lock poisoned")
222 .bytes_used
223 }
224
225 #[must_use]
227 pub fn capacity(&self) -> usize {
228 self.inner
229 .lock()
230 .expect("ring buffer lock poisoned")
231 .capacity
232 }
233
234 pub fn clear(&self) {
236 let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
237 inner.frames.clear();
238 inner.bytes_used = 0;
239 }
240}
241
242impl std::fmt::Debug for ConduitRingBuffer {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 let inner = self.inner.lock().expect("ring buffer lock poisoned");
245 f.debug_struct("ConduitRingBuffer")
246 .field("frame_count", &inner.frames.len())
247 .field("bytes_used", &inner.bytes_used)
248 .field("capacity", &inner.capacity)
249 .finish()
250 }
251}
252
253#[cfg(test)]
258mod tests {
259 use super::*;
260
261 #[test]
262 fn push_and_pop() {
263 let rb = ConduitRingBuffer::new(1024);
264 let _ = rb.push(b"alpha");
265 let _ = rb.push(b"beta");
266 let _ = rb.push(b"gamma");
267
268 assert_eq!(rb.frame_count(), 3);
269 assert_eq!(rb.try_pop().unwrap(), b"alpha");
270 assert_eq!(rb.try_pop().unwrap(), b"beta");
271 assert_eq!(rb.try_pop().unwrap(), b"gamma");
272 assert!(rb.try_pop().is_none());
273 }
274
275 #[test]
276 fn drain_all_format() {
277 let rb = ConduitRingBuffer::new(1024);
278 let _ = rb.push(b"hello");
279 let _ = rb.push(b"world");
280
281 let blob = rb.drain_all();
282
283 let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
285 assert_eq!(count, 2);
286
287 let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
288 assert_eq!(len1, 5);
289 assert_eq!(&blob[8..8 + len1], b"hello");
290
291 let offset2 = 8 + len1;
292 let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
293 assert_eq!(len2, 5);
294 assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
295
296 assert_eq!(rb.frame_count(), 0);
298 assert_eq!(rb.bytes_used(), 0);
299 }
300
301 #[test]
302 fn overflow_drops_oldest() {
303 let rb = ConduitRingBuffer::new(16);
307
308 let dropped = rb.push(b"aaaa"); assert_eq!(dropped, 0);
310
311 let dropped = rb.push(b"bbbb"); assert_eq!(dropped, 0);
313
314 let dropped = rb.push(b"cccc"); assert_eq!(dropped, 1);
317
318 assert_eq!(rb.frame_count(), 2);
319 assert_eq!(rb.try_pop().unwrap(), b"bbbb");
320 assert_eq!(rb.try_pop().unwrap(), b"cccc");
321 }
322
323 #[test]
324 fn empty_drain() {
325 let rb = ConduitRingBuffer::new(1024);
326 let blob = rb.drain_all();
327 assert!(blob.is_empty());
328 }
329
330 #[test]
331 fn frame_count_and_bytes() {
332 let rb = ConduitRingBuffer::new(1024);
333
334 assert_eq!(rb.frame_count(), 0);
335 assert_eq!(rb.bytes_used(), 0);
336 assert_eq!(rb.capacity(), 1024);
337
338 let _ = rb.push(b"abc"); assert_eq!(rb.frame_count(), 1);
340 assert_eq!(rb.bytes_used(), 7);
341
342 let _ = rb.push(b"de"); assert_eq!(rb.frame_count(), 2);
344 assert_eq!(rb.bytes_used(), 13);
345
346 let _ = rb.try_pop();
347 assert_eq!(rb.frame_count(), 1);
348 assert_eq!(rb.bytes_used(), 6);
349 }
350
351 #[test]
352 fn clear() {
353 let rb = ConduitRingBuffer::new(1024);
354 let _ = rb.push(b"one");
355 let _ = rb.push(b"two");
356 let _ = rb.push(b"three");
357
358 assert_eq!(rb.frame_count(), 3);
359 rb.clear();
360 assert_eq!(rb.frame_count(), 0);
361 assert_eq!(rb.bytes_used(), 0);
362 assert!(rb.try_pop().is_none());
363 }
364
365 #[tokio::test]
366 async fn concurrent_push_pop() {
367 use std::sync::Arc;
368
369 let rb = Arc::new(ConduitRingBuffer::new(64 * 1024));
370 let rb_producer = Arc::clone(&rb);
371 let rb_consumer = Arc::clone(&rb);
372
373 let producer = tokio::spawn(async move {
374 for i in 0u32..1000 {
375 let _ = rb_producer.push(&i.to_le_bytes());
376 }
377 });
378
379 let consumer = tokio::spawn(async move {
380 let mut popped = 0usize;
381 loop {
383 if let Some(_frame) = rb_consumer.try_pop() {
384 popped += 1;
385 } else {
386 tokio::task::yield_now().await;
388 }
389 if popped >= 1000 {
392 break;
393 }
394 }
395 popped
396 });
397
398 producer.await.unwrap();
399 let consumer_popped = consumer.await.unwrap();
401
402 let remaining = rb.frame_count();
406 assert_eq!(consumer_popped + remaining, 1000);
407 }
408
409 #[test]
410 fn single_large_frame() {
411 let rb = ConduitRingBuffer::new(32);
414 let _ = rb.push(b"ok"); let dropped = rb.push(&[0xFFu8; 100]); assert_eq!(dropped, 0); assert_eq!(rb.frame_count(), 1);
420 assert_eq!(rb.try_pop().unwrap(), b"ok");
421 }
422
423 #[test]
424 fn drain_then_push() {
425 let rb = ConduitRingBuffer::new(1024);
426 let _ = rb.push(b"first");
427 let blob = rb.drain_all();
428 assert!(!blob.is_empty());
429
430 let _ = rb.push(b"second");
432 assert_eq!(rb.frame_count(), 1);
433 assert_eq!(rb.try_pop().unwrap(), b"second");
434 }
435
436 #[test]
437 fn overflow_cascade() {
438 let rb = ConduitRingBuffer::new(8);
440
441 let _ = rb.push(b"aaaa"); assert_eq!(rb.frame_count(), 1);
443
444 let dropped = rb.push(&[0u8; 6]);
446 assert_eq!(dropped, 0);
448
449 assert_eq!(rb.frame_count(), 1);
451 assert_eq!(rb.try_pop().unwrap(), b"aaaa");
452 }
453
454 #[test]
455 #[should_panic(expected = "capacity must be at least")]
456 fn tiny_capacity_panics() {
457 ConduitRingBuffer::new(3); }
459
460 #[test]
461 fn with_default_capacity() {
462 let rb = ConduitRingBuffer::with_default_capacity();
463 assert_eq!(rb.capacity(), 64 * 1024);
464 }
465}