Skip to main content

conduit_core/
ringbuf.rs

1//! In-process ring buffer for high-frequency streaming.
2//!
3//! [`RingBuffer`] is the breakthrough component of tauri-conduit: an
4//! in-process circular buffer that lets the Rust backend stream binary frames
5//! to the WebView frontend without serialization, IPC, or inter-process shared
6//! memory. The custom protocol handler (`conduit://`) reads directly from it.
7//!
8//! # Design
9//!
10//! The buffer stores variable-length frames with a configurable byte budget
11//! (default 64 KB). When the budget is exceeded, the oldest frames are dropped
12//! to make room — this is lossy by design, because the JS consumer is expected
13//! to drain fast enough for real-time use cases (market data, sensor telemetry,
14//! audio buffers).
15//!
16//! # Wire format (`drain_all`)
17//!
18//! ```text
19//! [u32 LE frame_count]
20//! [u32 LE len_1][bytes_1]
21//! [u32 LE len_2][bytes_2]
22//! ...
23//! ```
24
25use std::sync::Mutex;
26
27use crate::codec::DRAIN_FRAME_OVERHEAD;
28
29// ---------------------------------------------------------------------------
30// Constants
31// ---------------------------------------------------------------------------
32
33/// Default capacity in bytes (64 KB).
34const DEFAULT_CAPACITY: usize = 64 * 1024;
35
36// ---------------------------------------------------------------------------
37// Inner
38// ---------------------------------------------------------------------------
39
40/// The unsynchronized interior of the ring buffer.
41///
42/// Frames are stored pre-formatted in wire layout: `[u32 LE len][bytes]` per
43/// frame, so that `drain_all()` can emit the entire payload with a single
44/// memcpy instead of N×2 `extend_from_slice` calls.
45struct Inner {
46    /// Pre-formatted wire data: frames stored as [u32 LE len][bytes][u32 LE len][bytes]...
47    wire_data: Vec<u8>,
48    /// Number of frames currently stored.
49    frame_count: u32,
50    /// Start of live data in wire_data (frames before this offset have been evicted).
51    read_pos: usize,
52    /// Total bytes used for capacity accounting: sum of (DRAIN_FRAME_OVERHEAD + frame.len()).
53    bytes_used: usize,
54    /// Maximum byte budget.
55    capacity: usize,
56}
57
58impl Inner {
59    /// Create an empty inner buffer with the given byte budget.
60    fn new(capacity: usize) -> Self {
61        Self {
62            wire_data: Vec::new(),
63            frame_count: 0,
64            read_pos: 0,
65            bytes_used: 0,
66            capacity,
67        }
68    }
69
70    /// Cost of storing a single frame (length prefix + payload).
71    #[inline]
72    fn frame_cost(frame: &[u8]) -> usize {
73        DRAIN_FRAME_OVERHEAD + frame.len()
74    }
75
76    /// Drop the oldest frame by advancing `read_pos`. Returns `true` if
77    /// a frame was actually removed.
78    fn drop_oldest(&mut self) -> bool {
79        if self.frame_count == 0 {
80            return false;
81        }
82        let len_bytes: [u8; 4] = self.wire_data[self.read_pos..self.read_pos + 4]
83            .try_into()
84            .unwrap();
85        let payload_len = u32::from_le_bytes(len_bytes) as usize;
86        let cost = DRAIN_FRAME_OVERHEAD + payload_len;
87        self.read_pos += cost;
88        self.frame_count -= 1;
89        self.bytes_used -= cost;
90        true
91    }
92}
93
94// ---------------------------------------------------------------------------
95// PushOutcome
96// ---------------------------------------------------------------------------
97
98/// Outcome of a [`RingBuffer::push`] operation.
99///
100/// Distinguishes between a frame being accepted (possibly with evictions)
101/// and a frame being discarded because it can never fit in the buffer.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum PushOutcome {
104    /// Frame was accepted. The `usize` is the number of older frames
105    /// that were evicted to make room (may be `0`).
106    Accepted(usize),
107    /// Frame was too large to ever fit in this buffer (even when empty)
108    /// and was silently discarded. No data was written.
109    TooLarge,
110}
111
112// ---------------------------------------------------------------------------
113// RingBuffer
114// ---------------------------------------------------------------------------
115
116/// Thread-safe, in-process circular buffer for streaming binary frames.
117///
118/// Frames are variable-length byte slices stored with a u32 LE length prefix.
119/// The buffer enforces a byte budget; when a push would exceed the budget the
120/// oldest frames are silently dropped (lossy back-pressure).
121///
122/// # Thread safety
123///
124/// All public methods take `&self` and synchronize via an internal [`Mutex`].
125/// Contention is expected to be low: typically one producer thread and one
126/// consumer (the custom protocol handler draining on a `fetch` call).
127pub struct RingBuffer {
128    inner: Mutex<Inner>,
129}
130
131impl RingBuffer {
132    /// Create a ring buffer with the given byte capacity.
133    ///
134    /// # Panics
135    ///
136    /// Panics if `capacity` is less than `DRAIN_FRAME_OVERHEAD + 1` (5 bytes),
137    /// since at least a 1-byte frame must be storable.
138    pub fn new(capacity: usize) -> Self {
139        assert!(
140            capacity > DRAIN_FRAME_OVERHEAD,
141            "capacity must be at least {} bytes (DRAIN_FRAME_OVERHEAD + 1)",
142            DRAIN_FRAME_OVERHEAD + 1,
143        );
144        Self {
145            inner: Mutex::new(Inner::new(capacity)),
146        }
147    }
148
149    /// Create a ring buffer with the default capacity (64 KB).
150    pub fn with_default_capacity() -> Self {
151        Self::new(DEFAULT_CAPACITY)
152    }
153
154    /// Push a frame into the buffer.
155    ///
156    /// If the frame (plus its 4-byte length prefix) would exceed the byte
157    /// budget, the oldest frames are dropped until there is room. Returns the
158    /// number of frames that were dropped to make space.
159    ///
160    /// If the frame itself is larger than the total capacity it is silently
161    /// discarded and the return value is `0`.
162    pub fn push(&self, frame: &[u8]) -> usize {
163        match self.push_checked(frame) {
164            PushOutcome::Accepted(n) => n,
165            PushOutcome::TooLarge => 0,
166        }
167    }
168
169    /// Push a frame with a richer outcome report.
170    ///
171    /// Like [`push`](Self::push), but returns [`PushOutcome::TooLarge`] when
172    /// the frame can never fit, instead of silently returning `0`.
173    #[must_use]
174    pub fn push_checked(&self, frame: &[u8]) -> PushOutcome {
175        // Guard: frame length must fit in u32 (wire format invariant) and
176        // frame_cost must not overflow usize (relevant on 32-bit targets).
177        if frame.len() > u32::MAX as usize
178            || DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none()
179        {
180            return PushOutcome::TooLarge;
181        }
182
183        let cost = Inner::frame_cost(frame);
184        let mut inner = crate::lock_or_recover(&self.inner);
185
186        // Frame too large for this buffer — discard it.
187        if cost > inner.capacity {
188            return PushOutcome::TooLarge;
189        }
190
191        let mut dropped = 0usize;
192        while inner.bytes_used + cost > inner.capacity {
193            if !inner.drop_oldest() {
194                break;
195            }
196            dropped += 1;
197        }
198
199        // Compact if read_pos is more than half the allocated buffer.
200        if inner.read_pos > 0 && inner.read_pos > inner.wire_data.len() / 2 {
201            let rp = inner.read_pos;
202            inner.wire_data.copy_within(rp.., 0);
203            let new_len = inner.wire_data.len() - rp;
204            inner.wire_data.truncate(new_len);
205            inner.read_pos = 0;
206        }
207
208        // Guard: frame count must fit in u32 (wire format uses u32 count header).
209        if inner.frame_count == u32::MAX {
210            return PushOutcome::TooLarge;
211        }
212
213        // Append frame in wire format: [u32 LE len][bytes].
214        inner
215            .wire_data
216            .extend_from_slice(&(frame.len() as u32).to_le_bytes());
217        inner.wire_data.extend_from_slice(frame);
218        inner.frame_count += 1;
219        inner.bytes_used += cost;
220        PushOutcome::Accepted(dropped)
221    }
222
223    /// Drain all buffered frames into a single binary blob and clear the
224    /// buffer.
225    ///
226    /// # Wire format
227    ///
228    /// ```text
229    /// [u32 LE frame_count]
230    /// [u32 LE len_1][bytes_1]
231    /// [u32 LE len_2][bytes_2]
232    /// ...
233    /// ```
234    ///
235    /// Returns an empty `Vec` if the buffer is empty.
236    #[must_use]
237    pub fn drain_all(&self) -> Vec<u8> {
238        // Take the pre-formatted wire data out under the lock, then prepend
239        // the frame count header without contention.
240        let (wire_data, read_pos, frame_count) = {
241            let mut inner = crate::lock_or_recover(&self.inner);
242            if inner.frame_count == 0 {
243                return Vec::new();
244            }
245            let wire_data = std::mem::take(&mut inner.wire_data);
246            let read_pos = inner.read_pos;
247            let frame_count = inner.frame_count;
248            inner.read_pos = 0;
249            inner.frame_count = 0;
250            inner.bytes_used = 0;
251            (wire_data, read_pos, frame_count)
252        };
253        // Lock released — build output with TWO extend_from_slice calls (was N×2).
254        let live_data = &wire_data[read_pos..];
255        let output_size = 4 + live_data.len();
256        let mut buf = Vec::with_capacity(output_size);
257        buf.extend_from_slice(&frame_count.to_le_bytes());
258        buf.extend_from_slice(live_data);
259        buf
260    }
261
262    /// Read one frame from the front of the buffer (FIFO).
263    ///
264    /// Returns `None` if the buffer is empty.
265    #[must_use]
266    pub fn try_pop(&self) -> Option<Vec<u8>> {
267        let mut inner = crate::lock_or_recover(&self.inner);
268        if inner.frame_count == 0 {
269            return None;
270        }
271        let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4]
272            .try_into()
273            .unwrap();
274        let payload_len = u32::from_le_bytes(len_bytes) as usize;
275        let payload_start = inner.read_pos + 4;
276        let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec();
277        let cost = DRAIN_FRAME_OVERHEAD + payload_len;
278        inner.read_pos += cost;
279        inner.frame_count -= 1;
280        inner.bytes_used -= cost;
281
282        // Compact when empty.
283        if inner.frame_count == 0 {
284            inner.wire_data.clear();
285            inner.read_pos = 0;
286        }
287
288        Some(frame)
289    }
290
291    /// Number of frames currently buffered.
292    #[must_use]
293    pub fn frame_count(&self) -> usize {
294        crate::lock_or_recover(&self.inner).frame_count as usize
295    }
296
297    /// Number of bytes currently used (including per-frame length prefixes).
298    #[must_use]
299    pub fn bytes_used(&self) -> usize {
300        crate::lock_or_recover(&self.inner).bytes_used
301    }
302
303    /// Total byte capacity of the buffer.
304    #[must_use]
305    pub fn capacity(&self) -> usize {
306        crate::lock_or_recover(&self.inner).capacity
307    }
308
309    /// Clear all buffered frames.
310    pub fn clear(&self) {
311        let mut inner = crate::lock_or_recover(&self.inner);
312        inner.wire_data.clear();
313        inner.frame_count = 0;
314        inner.read_pos = 0;
315        inner.bytes_used = 0;
316    }
317}
318
319impl std::fmt::Debug for RingBuffer {
320    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321        let inner = crate::lock_or_recover(&self.inner);
322        f.debug_struct("RingBuffer")
323            .field("frame_count", &inner.frame_count)
324            .field("bytes_used", &inner.bytes_used)
325            .field("capacity", &inner.capacity)
326            .finish()
327    }
328}
329
330// ---------------------------------------------------------------------------
331// Tests
332// ---------------------------------------------------------------------------
333
334#[cfg(test)]
335mod tests {
336    use super::*;
337
338    #[test]
339    fn push_and_pop() {
340        let rb = RingBuffer::new(1024);
341        let _ = rb.push(b"alpha");
342        let _ = rb.push(b"beta");
343        let _ = rb.push(b"gamma");
344
345        assert_eq!(rb.frame_count(), 3);
346        assert_eq!(rb.try_pop().unwrap(), b"alpha");
347        assert_eq!(rb.try_pop().unwrap(), b"beta");
348        assert_eq!(rb.try_pop().unwrap(), b"gamma");
349        assert!(rb.try_pop().is_none());
350    }
351
352    #[test]
353    fn drain_all_format() {
354        let rb = RingBuffer::new(1024);
355        let _ = rb.push(b"hello");
356        let _ = rb.push(b"world");
357
358        let blob = rb.drain_all();
359
360        // Parse: [u32 count][u32 len][bytes]...
361        let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
362        assert_eq!(count, 2);
363
364        let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
365        assert_eq!(len1, 5);
366        assert_eq!(&blob[8..8 + len1], b"hello");
367
368        let offset2 = 8 + len1;
369        let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
370        assert_eq!(len2, 5);
371        assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
372
373        // Buffer should be empty now.
374        assert_eq!(rb.frame_count(), 0);
375        assert_eq!(rb.bytes_used(), 0);
376    }
377
378    #[test]
379    fn overflow_drops_oldest() {
380        // Capacity for exactly 2 frames of 4 bytes each:
381        //   frame cost = 4 (overhead) + 4 (payload) = 8 bytes
382        //   2 frames = 16 bytes
383        let rb = RingBuffer::new(16);
384
385        let dropped = rb.push(b"aaaa"); // cost 8, total 8
386        assert_eq!(dropped, 0);
387
388        let dropped = rb.push(b"bbbb"); // cost 8, total 16
389        assert_eq!(dropped, 0);
390
391        // Third push must drop the oldest to fit.
392        let dropped = rb.push(b"cccc"); // drops "aaaa"
393        assert_eq!(dropped, 1);
394
395        assert_eq!(rb.frame_count(), 2);
396        assert_eq!(rb.try_pop().unwrap(), b"bbbb");
397        assert_eq!(rb.try_pop().unwrap(), b"cccc");
398    }
399
400    #[test]
401    fn empty_drain() {
402        let rb = RingBuffer::new(1024);
403        let blob = rb.drain_all();
404        assert!(blob.is_empty());
405    }
406
407    #[test]
408    fn frame_count_and_bytes() {
409        let rb = RingBuffer::new(1024);
410
411        assert_eq!(rb.frame_count(), 0);
412        assert_eq!(rb.bytes_used(), 0);
413        assert_eq!(rb.capacity(), 1024);
414
415        let _ = rb.push(b"abc"); // cost = 4 + 3 = 7
416        assert_eq!(rb.frame_count(), 1);
417        assert_eq!(rb.bytes_used(), 7);
418
419        let _ = rb.push(b"de"); // cost = 4 + 2 = 6
420        assert_eq!(rb.frame_count(), 2);
421        assert_eq!(rb.bytes_used(), 13);
422
423        let _ = rb.try_pop();
424        assert_eq!(rb.frame_count(), 1);
425        assert_eq!(rb.bytes_used(), 6);
426    }
427
428    #[test]
429    fn clear() {
430        let rb = RingBuffer::new(1024);
431        let _ = rb.push(b"one");
432        let _ = rb.push(b"two");
433        let _ = rb.push(b"three");
434
435        assert_eq!(rb.frame_count(), 3);
436        rb.clear();
437        assert_eq!(rb.frame_count(), 0);
438        assert_eq!(rb.bytes_used(), 0);
439        assert!(rb.try_pop().is_none());
440    }
441
442    #[tokio::test]
443    async fn concurrent_push_pop() {
444        use std::sync::Arc;
445
446        let rb = Arc::new(RingBuffer::new(64 * 1024));
447        let rb_producer = Arc::clone(&rb);
448        let rb_consumer = Arc::clone(&rb);
449
450        let producer = tokio::spawn(async move {
451            for i in 0u32..1000 {
452                let _ = rb_producer.push(&i.to_le_bytes());
453            }
454        });
455
456        let consumer = tokio::spawn(async move {
457            let mut popped = 0usize;
458            // Keep trying until the producer is done and the buffer is empty.
459            loop {
460                if let Some(_frame) = rb_consumer.try_pop() {
461                    popped += 1;
462                } else {
463                    // Yield to let the producer make progress.
464                    tokio::task::yield_now().await;
465                }
466                // Safety valve: once we know the producer pushed 1000, stop
467                // when the buffer is empty.
468                if popped >= 1000 {
469                    break;
470                }
471            }
472            popped
473        });
474
475        producer.await.unwrap();
476        // Drain whatever the consumer missed.
477        let consumer_popped = consumer.await.unwrap();
478
479        // Between the consumer and any remaining frames, we should account for
480        // all 1000 pushes (some may have been dropped due to timing, but with
481        // 64 KB capacity and 8 bytes per frame, nothing should be lost here).
482        let remaining = rb.frame_count();
483        assert_eq!(consumer_popped + remaining, 1000);
484    }
485
486    #[test]
487    fn single_large_frame() {
488        // Buffer capacity is 32 bytes. A frame of 100 bytes costs 104 bytes
489        // — larger than capacity. It should be silently discarded.
490        let rb = RingBuffer::new(32);
491        let _ = rb.push(b"ok"); // cost 6, fits
492        let dropped = rb.push(&[0xFFu8; 100]); // cost 104, too large
493        assert_eq!(dropped, 0); // not counted as "dropped oldest"
494
495        // The small frame should still be there.
496        assert_eq!(rb.frame_count(), 1);
497        assert_eq!(rb.try_pop().unwrap(), b"ok");
498    }
499
500    #[test]
501    fn drain_then_push() {
502        let rb = RingBuffer::new(1024);
503        let _ = rb.push(b"first");
504        let blob = rb.drain_all();
505        assert!(!blob.is_empty());
506
507        // Buffer is empty after drain; push more.
508        let _ = rb.push(b"second");
509        assert_eq!(rb.frame_count(), 1);
510        assert_eq!(rb.try_pop().unwrap(), b"second");
511    }
512
513    #[test]
514    fn overflow_cascade() {
515        // Capacity for exactly one 4-byte frame (cost = 8).
516        let rb = RingBuffer::new(8);
517
518        let _ = rb.push(b"aaaa"); // cost 8, fills completely
519        assert_eq!(rb.frame_count(), 1);
520
521        // Push a larger frame (6 bytes, cost 10 > 8) — too large for buffer.
522        let dropped = rb.push(&[0u8; 6]);
523        // The frame cannot fit even in an empty buffer, so it's discarded.
524        assert_eq!(dropped, 0);
525
526        // Original frame should still be intact.
527        assert_eq!(rb.frame_count(), 1);
528        assert_eq!(rb.try_pop().unwrap(), b"aaaa");
529    }
530
531    #[test]
532    #[should_panic(expected = "capacity must be at least 5 bytes")]
533    fn tiny_capacity_panics() {
534        RingBuffer::new(4); // equal to DRAIN_FRAME_OVERHEAD, but less than DRAIN_FRAME_OVERHEAD + 1
535    }
536
537    #[test]
538    fn with_default_capacity() {
539        let rb = RingBuffer::with_default_capacity();
540        assert_eq!(rb.capacity(), 64 * 1024);
541    }
542}