Skip to main content

conduit_core/
ringbuf.rs

1//! In-process ring buffer for high-frequency streaming.
2//!
3//! [`RingBuffer`] is the breakthrough component of tauri-conduit: an
4//! in-process circular buffer that lets the Rust backend stream binary frames
5//! to the WebView frontend without serialization, IPC, or inter-process shared
6//! memory. The custom protocol handler (`conduit://`) reads directly from it.
7//!
8//! # Design
9//!
10//! The buffer stores variable-length frames with a configurable byte budget
11//! (default 64 KB). When the budget is exceeded, the oldest frames are dropped
12//! to make room — this is lossy by design, because the JS consumer is expected
13//! to drain fast enough for real-time use cases (market data, sensor telemetry,
14//! audio buffers).
15//!
16//! # Wire format (`drain_all`)
17//!
18//! ```text
19//! [u32 LE frame_count]
20//! [u32 LE len_1][bytes_1]
21//! [u32 LE len_2][bytes_2]
22//! ...
23//! ```
24
25use std::collections::VecDeque;
26use std::sync::Mutex;
27
28use crate::codec::DRAIN_FRAME_OVERHEAD;
29
30// ---------------------------------------------------------------------------
31// Constants
32// ---------------------------------------------------------------------------
33
34/// Default capacity in bytes (64 KB).
35const DEFAULT_CAPACITY: usize = 64 * 1024;
36
37// ---------------------------------------------------------------------------
38// Inner
39// ---------------------------------------------------------------------------
40
41/// The unsynchronized interior of the ring buffer.
42struct Inner {
43    /// Buffered frames in FIFO order.
44    frames: VecDeque<Vec<u8>>,
45    /// Total bytes used: sum of (DRAIN_FRAME_OVERHEAD + frame.len()) for each frame.
46    bytes_used: usize,
47    /// Maximum byte budget.
48    capacity: usize,
49}
50
51impl Inner {
52    /// Create an empty inner buffer with the given byte budget.
53    fn new(capacity: usize) -> Self {
54        Self {
55            frames: VecDeque::new(),
56            bytes_used: 0,
57            capacity,
58        }
59    }
60
61    /// Cost of storing a single frame (length prefix + payload).
62    #[inline]
63    fn frame_cost(frame: &[u8]) -> usize {
64        DRAIN_FRAME_OVERHEAD + frame.len()
65    }
66
67    /// Drop the oldest frame, adjusting the byte counter. Returns `true` if
68    /// a frame was actually removed.
69    fn drop_oldest(&mut self) -> bool {
70        if let Some(old) = self.frames.pop_front() {
71            self.bytes_used -= Self::frame_cost(&old);
72            true
73        } else {
74            false
75        }
76    }
77}
78
79// ---------------------------------------------------------------------------
80// PushOutcome
81// ---------------------------------------------------------------------------
82
83/// Outcome of a [`RingBuffer::push`] operation.
84///
85/// Distinguishes between a frame being accepted (possibly with evictions)
86/// and a frame being discarded because it can never fit in the buffer.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum PushOutcome {
89    /// Frame was accepted. The `usize` is the number of older frames
90    /// that were evicted to make room (may be `0`).
91    Accepted(usize),
92    /// Frame was too large to ever fit in this buffer (even when empty)
93    /// and was silently discarded. No data was written.
94    TooLarge,
95}
96
97// ---------------------------------------------------------------------------
98// RingBuffer
99// ---------------------------------------------------------------------------
100
101/// Thread-safe, in-process circular buffer for streaming binary frames.
102///
103/// Frames are variable-length byte slices stored with a u32 LE length prefix.
104/// The buffer enforces a byte budget; when a push would exceed the budget the
105/// oldest frames are silently dropped (lossy back-pressure).
106///
107/// # Thread safety
108///
109/// All public methods take `&self` and synchronize via an internal [`Mutex`].
110/// Contention is expected to be low: typically one producer thread and one
111/// consumer (the custom protocol handler draining on a `fetch` call).
112pub struct RingBuffer {
113    inner: Mutex<Inner>,
114}
115
116impl RingBuffer {
117    /// Create a ring buffer with the given byte capacity.
118    ///
119    /// # Panics
120    ///
121    /// Panics if `capacity` is less than `DRAIN_FRAME_OVERHEAD + 1` (5 bytes),
122    /// since at least a 1-byte frame must be storable.
123    pub fn new(capacity: usize) -> Self {
124        assert!(
125            capacity > DRAIN_FRAME_OVERHEAD,
126            "capacity must be at least {} bytes (DRAIN_FRAME_OVERHEAD + 1)",
127            DRAIN_FRAME_OVERHEAD + 1,
128        );
129        Self {
130            inner: Mutex::new(Inner::new(capacity)),
131        }
132    }
133
134    /// Create a ring buffer with the default capacity (64 KB).
135    pub fn with_default_capacity() -> Self {
136        Self::new(DEFAULT_CAPACITY)
137    }
138
139    /// Push a frame into the buffer.
140    ///
141    /// If the frame (plus its 4-byte length prefix) would exceed the byte
142    /// budget, the oldest frames are dropped until there is room. Returns the
143    /// number of frames that were dropped to make space.
144    ///
145    /// If the frame itself is larger than the total capacity it is silently
146    /// discarded and the return value is `0`.
147    pub fn push(&self, frame: &[u8]) -> usize {
148        match self.push_checked(frame) {
149            PushOutcome::Accepted(n) => n,
150            PushOutcome::TooLarge => 0,
151        }
152    }
153
154    /// Push a frame with a richer outcome report.
155    ///
156    /// Like [`push`](Self::push), but returns [`PushOutcome::TooLarge`] when
157    /// the frame can never fit, instead of silently returning `0`.
158    #[must_use]
159    pub fn push_checked(&self, frame: &[u8]) -> PushOutcome {
160        let cost = Inner::frame_cost(frame);
161        let mut inner = crate::lock_or_recover(&self.inner);
162
163        // Frame too large for this buffer — discard it.
164        if cost > inner.capacity {
165            return PushOutcome::TooLarge;
166        }
167
168        let mut dropped = 0usize;
169        while inner.bytes_used + cost > inner.capacity {
170            if !inner.drop_oldest() {
171                break;
172            }
173            dropped += 1;
174        }
175
176        inner.frames.push_back(frame.to_vec());
177        inner.bytes_used += cost;
178        PushOutcome::Accepted(dropped)
179    }
180
181    /// Drain all buffered frames into a single binary blob and clear the
182    /// buffer.
183    ///
184    /// # Wire format
185    ///
186    /// ```text
187    /// [u32 LE frame_count]
188    /// [u32 LE len_1][bytes_1]
189    /// [u32 LE len_2][bytes_2]
190    /// ...
191    /// ```
192    ///
193    /// Returns an empty `Vec` if the buffer is empty.
194    #[must_use]
195    pub fn drain_all(&self) -> Vec<u8> {
196        // Swap the frames out under the lock, then serialize without contention.
197        let (mut frames, bytes_used) = {
198            let mut inner = crate::lock_or_recover(&self.inner);
199            if inner.frames.is_empty() {
200                return Vec::new();
201            }
202            let frames = std::mem::take(&mut inner.frames);
203            let bytes_used = inner.bytes_used;
204            inner.bytes_used = 0;
205            (frames, bytes_used)
206        };
207        // Lock released — serialize without holding the mutex.
208        let output_size = 4usize.saturating_add(bytes_used);
209        let mut buf = Vec::with_capacity(output_size);
210
211        // Frame count header.
212        let count = frames.len() as u32;
213        buf.extend_from_slice(&count.to_le_bytes());
214
215        // Each frame: [u32 LE len][bytes].
216        for frame in frames.make_contiguous() {
217            let len = frame.len() as u32;
218            buf.extend_from_slice(&len.to_le_bytes());
219            buf.extend_from_slice(frame);
220        }
221
222        buf
223    }
224
225    /// Read one frame from the front of the buffer (FIFO).
226    ///
227    /// Returns `None` if the buffer is empty.
228    #[must_use]
229    pub fn try_pop(&self) -> Option<Vec<u8>> {
230        let mut inner = crate::lock_or_recover(&self.inner);
231        let frame = inner.frames.pop_front()?;
232        inner.bytes_used -= Inner::frame_cost(&frame);
233        Some(frame)
234    }
235
236    /// Number of frames currently buffered.
237    #[must_use]
238    pub fn frame_count(&self) -> usize {
239        crate::lock_or_recover(&self.inner).frames.len()
240    }
241
242    /// Number of bytes currently used (including per-frame length prefixes).
243    #[must_use]
244    pub fn bytes_used(&self) -> usize {
245        crate::lock_or_recover(&self.inner).bytes_used
246    }
247
248    /// Total byte capacity of the buffer.
249    #[must_use]
250    pub fn capacity(&self) -> usize {
251        crate::lock_or_recover(&self.inner).capacity
252    }
253
254    /// Clear all buffered frames.
255    pub fn clear(&self) {
256        let mut inner = crate::lock_or_recover(&self.inner);
257        inner.frames.clear();
258        inner.bytes_used = 0;
259    }
260}
261
262impl std::fmt::Debug for RingBuffer {
263    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264        let inner = crate::lock_or_recover(&self.inner);
265        f.debug_struct("RingBuffer")
266            .field("frame_count", &inner.frames.len())
267            .field("bytes_used", &inner.bytes_used)
268            .field("capacity", &inner.capacity)
269            .finish()
270    }
271}
272
273// ---------------------------------------------------------------------------
274// Tests
275// ---------------------------------------------------------------------------
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    #[test]
282    fn push_and_pop() {
283        let rb = RingBuffer::new(1024);
284        let _ = rb.push(b"alpha");
285        let _ = rb.push(b"beta");
286        let _ = rb.push(b"gamma");
287
288        assert_eq!(rb.frame_count(), 3);
289        assert_eq!(rb.try_pop().unwrap(), b"alpha");
290        assert_eq!(rb.try_pop().unwrap(), b"beta");
291        assert_eq!(rb.try_pop().unwrap(), b"gamma");
292        assert!(rb.try_pop().is_none());
293    }
294
295    #[test]
296    fn drain_all_format() {
297        let rb = RingBuffer::new(1024);
298        let _ = rb.push(b"hello");
299        let _ = rb.push(b"world");
300
301        let blob = rb.drain_all();
302
303        // Parse: [u32 count][u32 len][bytes]...
304        let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
305        assert_eq!(count, 2);
306
307        let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
308        assert_eq!(len1, 5);
309        assert_eq!(&blob[8..8 + len1], b"hello");
310
311        let offset2 = 8 + len1;
312        let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
313        assert_eq!(len2, 5);
314        assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
315
316        // Buffer should be empty now.
317        assert_eq!(rb.frame_count(), 0);
318        assert_eq!(rb.bytes_used(), 0);
319    }
320
321    #[test]
322    fn overflow_drops_oldest() {
323        // Capacity for exactly 2 frames of 4 bytes each:
324        //   frame cost = 4 (overhead) + 4 (payload) = 8 bytes
325        //   2 frames = 16 bytes
326        let rb = RingBuffer::new(16);
327
328        let dropped = rb.push(b"aaaa"); // cost 8, total 8
329        assert_eq!(dropped, 0);
330
331        let dropped = rb.push(b"bbbb"); // cost 8, total 16
332        assert_eq!(dropped, 0);
333
334        // Third push must drop the oldest to fit.
335        let dropped = rb.push(b"cccc"); // drops "aaaa"
336        assert_eq!(dropped, 1);
337
338        assert_eq!(rb.frame_count(), 2);
339        assert_eq!(rb.try_pop().unwrap(), b"bbbb");
340        assert_eq!(rb.try_pop().unwrap(), b"cccc");
341    }
342
343    #[test]
344    fn empty_drain() {
345        let rb = RingBuffer::new(1024);
346        let blob = rb.drain_all();
347        assert!(blob.is_empty());
348    }
349
350    #[test]
351    fn frame_count_and_bytes() {
352        let rb = RingBuffer::new(1024);
353
354        assert_eq!(rb.frame_count(), 0);
355        assert_eq!(rb.bytes_used(), 0);
356        assert_eq!(rb.capacity(), 1024);
357
358        let _ = rb.push(b"abc"); // cost = 4 + 3 = 7
359        assert_eq!(rb.frame_count(), 1);
360        assert_eq!(rb.bytes_used(), 7);
361
362        let _ = rb.push(b"de"); // cost = 4 + 2 = 6
363        assert_eq!(rb.frame_count(), 2);
364        assert_eq!(rb.bytes_used(), 13);
365
366        let _ = rb.try_pop();
367        assert_eq!(rb.frame_count(), 1);
368        assert_eq!(rb.bytes_used(), 6);
369    }
370
371    #[test]
372    fn clear() {
373        let rb = RingBuffer::new(1024);
374        let _ = rb.push(b"one");
375        let _ = rb.push(b"two");
376        let _ = rb.push(b"three");
377
378        assert_eq!(rb.frame_count(), 3);
379        rb.clear();
380        assert_eq!(rb.frame_count(), 0);
381        assert_eq!(rb.bytes_used(), 0);
382        assert!(rb.try_pop().is_none());
383    }
384
385    #[tokio::test]
386    async fn concurrent_push_pop() {
387        use std::sync::Arc;
388
389        let rb = Arc::new(RingBuffer::new(64 * 1024));
390        let rb_producer = Arc::clone(&rb);
391        let rb_consumer = Arc::clone(&rb);
392
393        let producer = tokio::spawn(async move {
394            for i in 0u32..1000 {
395                let _ = rb_producer.push(&i.to_le_bytes());
396            }
397        });
398
399        let consumer = tokio::spawn(async move {
400            let mut popped = 0usize;
401            // Keep trying until the producer is done and the buffer is empty.
402            loop {
403                if let Some(_frame) = rb_consumer.try_pop() {
404                    popped += 1;
405                } else {
406                    // Yield to let the producer make progress.
407                    tokio::task::yield_now().await;
408                }
409                // Safety valve: once we know the producer pushed 1000, stop
410                // when the buffer is empty.
411                if popped >= 1000 {
412                    break;
413                }
414            }
415            popped
416        });
417
418        producer.await.unwrap();
419        // Drain whatever the consumer missed.
420        let consumer_popped = consumer.await.unwrap();
421
422        // Between the consumer and any remaining frames, we should account for
423        // all 1000 pushes (some may have been dropped due to timing, but with
424        // 64 KB capacity and 8 bytes per frame, nothing should be lost here).
425        let remaining = rb.frame_count();
426        assert_eq!(consumer_popped + remaining, 1000);
427    }
428
429    #[test]
430    fn single_large_frame() {
431        // Buffer capacity is 32 bytes. A frame of 100 bytes costs 104 bytes
432        // — larger than capacity. It should be silently discarded.
433        let rb = RingBuffer::new(32);
434        let _ = rb.push(b"ok"); // cost 6, fits
435        let dropped = rb.push(&[0xFFu8; 100]); // cost 104, too large
436        assert_eq!(dropped, 0); // not counted as "dropped oldest"
437
438        // The small frame should still be there.
439        assert_eq!(rb.frame_count(), 1);
440        assert_eq!(rb.try_pop().unwrap(), b"ok");
441    }
442
443    #[test]
444    fn drain_then_push() {
445        let rb = RingBuffer::new(1024);
446        let _ = rb.push(b"first");
447        let blob = rb.drain_all();
448        assert!(!blob.is_empty());
449
450        // Buffer is empty after drain; push more.
451        let _ = rb.push(b"second");
452        assert_eq!(rb.frame_count(), 1);
453        assert_eq!(rb.try_pop().unwrap(), b"second");
454    }
455
456    #[test]
457    fn overflow_cascade() {
458        // Capacity for exactly one 4-byte frame (cost = 8).
459        let rb = RingBuffer::new(8);
460
461        let _ = rb.push(b"aaaa"); // cost 8, fills completely
462        assert_eq!(rb.frame_count(), 1);
463
464        // Push a larger frame (6 bytes, cost 10 > 8) — too large for buffer.
465        let dropped = rb.push(&[0u8; 6]);
466        // The frame cannot fit even in an empty buffer, so it's discarded.
467        assert_eq!(dropped, 0);
468
469        // Original frame should still be intact.
470        assert_eq!(rb.frame_count(), 1);
471        assert_eq!(rb.try_pop().unwrap(), b"aaaa");
472    }
473
474    #[test]
475    #[should_panic(expected = "capacity must be at least 5 bytes")]
476    fn tiny_capacity_panics() {
477        RingBuffer::new(4); // equal to DRAIN_FRAME_OVERHEAD, but less than DRAIN_FRAME_OVERHEAD + 1
478    }
479
480    #[test]
481    fn with_default_capacity() {
482        let rb = RingBuffer::with_default_capacity();
483        assert_eq!(rb.capacity(), 64 * 1024);
484    }
485}