Skip to main content

conduit_core/
ringbuf.rs

1//! In-process ring buffer for high-frequency streaming.
2//!
3//! [`RingBuffer`] is the breakthrough component of tauri-conduit: an
4//! in-process circular buffer that lets the Rust backend stream binary frames
5//! to the WebView frontend without serialization, IPC, or inter-process shared
6//! memory. The custom protocol handler (`conduit://`) reads directly from it.
7//!
8//! # Design
9//!
10//! The buffer stores variable-length frames with a configurable byte budget
11//! (default 64 KB). When the budget is exceeded, the oldest frames are dropped
12//! to make room — this is lossy by design, because the JS consumer is expected
13//! to drain fast enough for real-time use cases (market data, sensor telemetry,
14//! audio buffers).
15//!
16//! # Wire format (`drain_all`)
17//!
18//! ```text
19//! [u32 LE frame_count]
20//! [u32 LE len_1][bytes_1]
21//! [u32 LE len_2][bytes_2]
22//! ...
23//! ```
24
25use std::collections::VecDeque;
26use std::sync::Mutex;
27
28// ---------------------------------------------------------------------------
29// Constants
30// ---------------------------------------------------------------------------
31
32/// Default capacity in bytes (64 KB).
33const DEFAULT_CAPACITY: usize = 64 * 1024;
34
35/// Per-frame overhead: 4 bytes for the u32 LE length prefix.
36const FRAME_OVERHEAD: usize = 4;
37
38// ---------------------------------------------------------------------------
39// Inner
40// ---------------------------------------------------------------------------
41
42/// The unsynchronized interior of the ring buffer.
43struct Inner {
44    /// Buffered frames in FIFO order.
45    frames: VecDeque<Vec<u8>>,
46    /// Total bytes used: sum of (FRAME_OVERHEAD + frame.len()) for each frame.
47    bytes_used: usize,
48    /// Maximum byte budget.
49    capacity: usize,
50}
51
52impl Inner {
53    /// Create an empty inner buffer with the given byte budget.
54    fn new(capacity: usize) -> Self {
55        Self {
56            frames: VecDeque::new(),
57            bytes_used: 0,
58            capacity,
59        }
60    }
61
62    /// Cost of storing a single frame (length prefix + payload).
63    #[inline]
64    fn frame_cost(frame: &[u8]) -> usize {
65        FRAME_OVERHEAD + frame.len()
66    }
67
68    /// Drop the oldest frame, adjusting the byte counter. Returns `true` if
69    /// a frame was actually removed.
70    fn drop_oldest(&mut self) -> bool {
71        if let Some(old) = self.frames.pop_front() {
72            self.bytes_used -= Self::frame_cost(&old);
73            true
74        } else {
75            false
76        }
77    }
78}
79
80// ---------------------------------------------------------------------------
81// RingBuffer
82// ---------------------------------------------------------------------------
83
84/// Thread-safe, in-process circular buffer for streaming binary frames.
85///
86/// Frames are variable-length byte slices stored with a u32 LE length prefix.
87/// The buffer enforces a byte budget; when a push would exceed the budget the
88/// oldest frames are silently dropped (lossy back-pressure).
89///
90/// # Thread safety
91///
92/// All public methods take `&self` and synchronize via an internal [`Mutex`].
93/// Contention is expected to be low: typically one producer thread and one
94/// consumer (the custom protocol handler draining on a `fetch` call).
95pub struct RingBuffer {
96    inner: Mutex<Inner>,
97}
98
99impl RingBuffer {
100    /// Create a ring buffer with the given byte capacity.
101    ///
102    /// # Panics
103    ///
104    /// Panics if `capacity` is less than [`FRAME_OVERHEAD`] (4 bytes), since
105    /// no frame could ever fit.
106    pub fn new(capacity: usize) -> Self {
107        assert!(
108            capacity >= FRAME_OVERHEAD,
109            "capacity must be at least {FRAME_OVERHEAD} bytes"
110        );
111        Self {
112            inner: Mutex::new(Inner::new(capacity)),
113        }
114    }
115
116    /// Create a ring buffer with the default capacity (64 KB).
117    pub fn with_default_capacity() -> Self {
118        Self::new(DEFAULT_CAPACITY)
119    }
120
121    /// Push a frame into the buffer.
122    ///
123    /// If the frame (plus its 4-byte length prefix) would exceed the byte
124    /// budget, the oldest frames are dropped until there is room. Returns the
125    /// number of frames that were dropped to make space.
126    ///
127    /// If the frame itself is larger than the total capacity it is silently
128    /// discarded and the return value is `0`.
129    #[must_use]
130    pub fn push(&self, frame: &[u8]) -> usize {
131        let cost = Inner::frame_cost(frame);
132        let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
133
134        // Frame too large for this buffer — discard it.
135        if cost > inner.capacity {
136            return 0;
137        }
138
139        let mut dropped = 0usize;
140        while inner.bytes_used + cost > inner.capacity {
141            if !inner.drop_oldest() {
142                break;
143            }
144            dropped += 1;
145        }
146
147        inner.frames.push_back(frame.to_vec());
148        inner.bytes_used += cost;
149        dropped
150    }
151
152    /// Drain all buffered frames into a single binary blob and clear the
153    /// buffer.
154    ///
155    /// # Wire format
156    ///
157    /// ```text
158    /// [u32 LE frame_count]
159    /// [u32 LE len_1][bytes_1]
160    /// [u32 LE len_2][bytes_2]
161    /// ...
162    /// ```
163    ///
164    /// Returns an empty `Vec` if the buffer is empty.
165    #[must_use]
166    pub fn drain_all(&self) -> Vec<u8> {
167        let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
168
169        if inner.frames.is_empty() {
170            return Vec::new();
171        }
172
173        // Pre-calculate output size: 4 (count) + sum of (4 + len) per frame.
174        let output_size = 4 + inner.bytes_used;
175        let mut buf = Vec::with_capacity(output_size);
176
177        // Frame count header.
178        let count = inner.frames.len() as u32;
179        buf.extend_from_slice(&count.to_le_bytes());
180
181        // Each frame: [u32 LE len][bytes].
182        for frame in &inner.frames {
183            let len = frame.len() as u32;
184            buf.extend_from_slice(&len.to_le_bytes());
185            buf.extend_from_slice(frame);
186        }
187
188        // Reset.
189        inner.frames.clear();
190        inner.bytes_used = 0;
191
192        buf
193    }
194
195    /// Read one frame from the front of the buffer (FIFO).
196    ///
197    /// Returns `None` if the buffer is empty.
198    #[must_use]
199    pub fn try_pop(&self) -> Option<Vec<u8>> {
200        let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
201        let frame = inner.frames.pop_front()?;
202        inner.bytes_used -= Inner::frame_cost(&frame);
203        Some(frame)
204    }
205
206    /// Number of frames currently buffered.
207    #[must_use]
208    pub fn frame_count(&self) -> usize {
209        self.inner
210            .lock()
211            .expect("ring buffer lock poisoned")
212            .frames
213            .len()
214    }
215
216    /// Number of bytes currently used (including per-frame length prefixes).
217    #[must_use]
218    pub fn bytes_used(&self) -> usize {
219        self.inner
220            .lock()
221            .expect("ring buffer lock poisoned")
222            .bytes_used
223    }
224
225    /// Total byte capacity of the buffer.
226    #[must_use]
227    pub fn capacity(&self) -> usize {
228        self.inner
229            .lock()
230            .expect("ring buffer lock poisoned")
231            .capacity
232    }
233
234    /// Clear all buffered frames.
235    pub fn clear(&self) {
236        let mut inner = self.inner.lock().expect("ring buffer lock poisoned");
237        inner.frames.clear();
238        inner.bytes_used = 0;
239    }
240}
241
242impl std::fmt::Debug for RingBuffer {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        let inner = self.inner.lock().expect("ring buffer lock poisoned");
245        f.debug_struct("RingBuffer")
246            .field("frame_count", &inner.frames.len())
247            .field("bytes_used", &inner.bytes_used)
248            .field("capacity", &inner.capacity)
249            .finish()
250    }
251}
252
253// ---------------------------------------------------------------------------
254// Tests
255// ---------------------------------------------------------------------------
256
257#[cfg(test)]
258mod tests {
259    use super::*;
260
261    #[test]
262    fn push_and_pop() {
263        let rb = RingBuffer::new(1024);
264        let _ = rb.push(b"alpha");
265        let _ = rb.push(b"beta");
266        let _ = rb.push(b"gamma");
267
268        assert_eq!(rb.frame_count(), 3);
269        assert_eq!(rb.try_pop().unwrap(), b"alpha");
270        assert_eq!(rb.try_pop().unwrap(), b"beta");
271        assert_eq!(rb.try_pop().unwrap(), b"gamma");
272        assert!(rb.try_pop().is_none());
273    }
274
275    #[test]
276    fn drain_all_format() {
277        let rb = RingBuffer::new(1024);
278        let _ = rb.push(b"hello");
279        let _ = rb.push(b"world");
280
281        let blob = rb.drain_all();
282
283        // Parse: [u32 count][u32 len][bytes]...
284        let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
285        assert_eq!(count, 2);
286
287        let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
288        assert_eq!(len1, 5);
289        assert_eq!(&blob[8..8 + len1], b"hello");
290
291        let offset2 = 8 + len1;
292        let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
293        assert_eq!(len2, 5);
294        assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
295
296        // Buffer should be empty now.
297        assert_eq!(rb.frame_count(), 0);
298        assert_eq!(rb.bytes_used(), 0);
299    }
300
301    #[test]
302    fn overflow_drops_oldest() {
303        // Capacity for exactly 2 frames of 4 bytes each:
304        //   frame cost = 4 (overhead) + 4 (payload) = 8 bytes
305        //   2 frames = 16 bytes
306        let rb = RingBuffer::new(16);
307
308        let dropped = rb.push(b"aaaa"); // cost 8, total 8
309        assert_eq!(dropped, 0);
310
311        let dropped = rb.push(b"bbbb"); // cost 8, total 16
312        assert_eq!(dropped, 0);
313
314        // Third push must drop the oldest to fit.
315        let dropped = rb.push(b"cccc"); // cost 8, needs to drop "aaaa"
316        assert_eq!(dropped, 1);
317
318        assert_eq!(rb.frame_count(), 2);
319        assert_eq!(rb.try_pop().unwrap(), b"bbbb");
320        assert_eq!(rb.try_pop().unwrap(), b"cccc");
321    }
322
323    #[test]
324    fn empty_drain() {
325        let rb = RingBuffer::new(1024);
326        let blob = rb.drain_all();
327        assert!(blob.is_empty());
328    }
329
330    #[test]
331    fn frame_count_and_bytes() {
332        let rb = RingBuffer::new(1024);
333
334        assert_eq!(rb.frame_count(), 0);
335        assert_eq!(rb.bytes_used(), 0);
336        assert_eq!(rb.capacity(), 1024);
337
338        let _ = rb.push(b"abc"); // cost = 4 + 3 = 7
339        assert_eq!(rb.frame_count(), 1);
340        assert_eq!(rb.bytes_used(), 7);
341
342        let _ = rb.push(b"de"); // cost = 4 + 2 = 6
343        assert_eq!(rb.frame_count(), 2);
344        assert_eq!(rb.bytes_used(), 13);
345
346        let _ = rb.try_pop();
347        assert_eq!(rb.frame_count(), 1);
348        assert_eq!(rb.bytes_used(), 6);
349    }
350
351    #[test]
352    fn clear() {
353        let rb = RingBuffer::new(1024);
354        let _ = rb.push(b"one");
355        let _ = rb.push(b"two");
356        let _ = rb.push(b"three");
357
358        assert_eq!(rb.frame_count(), 3);
359        rb.clear();
360        assert_eq!(rb.frame_count(), 0);
361        assert_eq!(rb.bytes_used(), 0);
362        assert!(rb.try_pop().is_none());
363    }
364
365    #[tokio::test]
366    async fn concurrent_push_pop() {
367        use std::sync::Arc;
368
369        let rb = Arc::new(RingBuffer::new(64 * 1024));
370        let rb_producer = Arc::clone(&rb);
371        let rb_consumer = Arc::clone(&rb);
372
373        let producer = tokio::spawn(async move {
374            for i in 0u32..1000 {
375                let _ = rb_producer.push(&i.to_le_bytes());
376            }
377        });
378
379        let consumer = tokio::spawn(async move {
380            let mut popped = 0usize;
381            // Keep trying until the producer is done and the buffer is empty.
382            loop {
383                if let Some(_frame) = rb_consumer.try_pop() {
384                    popped += 1;
385                } else {
386                    // Yield to let the producer make progress.
387                    tokio::task::yield_now().await;
388                }
389                // Safety valve: once we know the producer pushed 1000, stop
390                // when the buffer is empty.
391                if popped >= 1000 {
392                    break;
393                }
394            }
395            popped
396        });
397
398        producer.await.unwrap();
399        // Drain whatever the consumer missed.
400        let consumer_popped = consumer.await.unwrap();
401
402        // Between the consumer and any remaining frames, we should account for
403        // all 1000 pushes (some may have been dropped due to timing, but with
404        // 64 KB capacity and 8 bytes per frame, nothing should be lost here).
405        let remaining = rb.frame_count();
406        assert_eq!(consumer_popped + remaining, 1000);
407    }
408
409    #[test]
410    fn single_large_frame() {
411        // Buffer capacity is 32 bytes. A frame of 100 bytes costs 104 bytes
412        // — larger than capacity. It should be silently discarded.
413        let rb = RingBuffer::new(32);
414        let _ = rb.push(b"ok"); // cost 6, fits
415        let dropped = rb.push(&[0xFFu8; 100]); // cost 104, too large
416        assert_eq!(dropped, 0); // not counted as "dropped oldest"
417
418        // The small frame should still be there.
419        assert_eq!(rb.frame_count(), 1);
420        assert_eq!(rb.try_pop().unwrap(), b"ok");
421    }
422
423    #[test]
424    fn drain_then_push() {
425        let rb = RingBuffer::new(1024);
426        let _ = rb.push(b"first");
427        let blob = rb.drain_all();
428        assert!(!blob.is_empty());
429
430        // Buffer is empty after drain; push more.
431        let _ = rb.push(b"second");
432        assert_eq!(rb.frame_count(), 1);
433        assert_eq!(rb.try_pop().unwrap(), b"second");
434    }
435
436    #[test]
437    fn overflow_cascade() {
438        // Capacity for exactly one 4-byte frame (cost = 8).
439        let rb = RingBuffer::new(8);
440
441        let _ = rb.push(b"aaaa"); // cost 8, fills completely
442        assert_eq!(rb.frame_count(), 1);
443
444        // Push a larger frame (6 bytes, cost 10 > 8) — too large for buffer.
445        let dropped = rb.push(&[0u8; 6]);
446        // The frame cannot fit even in an empty buffer, so it's discarded.
447        assert_eq!(dropped, 0);
448
449        // Original frame should still be intact.
450        assert_eq!(rb.frame_count(), 1);
451        assert_eq!(rb.try_pop().unwrap(), b"aaaa");
452    }
453
454    #[test]
455    #[should_panic(expected = "capacity must be at least")]
456    fn tiny_capacity_panics() {
457        RingBuffer::new(3); // less than FRAME_OVERHEAD
458    }
459
460    #[test]
461    fn with_default_capacity() {
462        let rb = RingBuffer::with_default_capacity();
463        assert_eq!(rb.capacity(), 64 * 1024);
464    }
465}