conduit_core/ringbuf.rs
1//! In-process ring buffer for high-frequency streaming.
2//!
3//! [`RingBuffer`] is the breakthrough component of tauri-conduit: an
4//! in-process circular buffer that lets the Rust backend stream binary frames
5//! to the WebView frontend without serialization, IPC, or inter-process shared
6//! memory. The custom protocol handler (`conduit://`) reads directly from it.
7//!
8//! # Design
9//!
10//! The buffer stores variable-length frames with a configurable byte budget
11//! (default 64 KB). When the budget is exceeded, the oldest frames are dropped
12//! to make room — this is lossy by design, because the JS consumer is expected
13//! to drain fast enough for real-time use cases (market data, sensor telemetry,
14//! audio buffers).
15//!
16//! # Wire format (`drain_all`)
17//!
18//! ```text
19//! [u32 LE frame_count]
20//! [u32 LE len_1][bytes_1]
21//! [u32 LE len_2][bytes_2]
22//! ...
23//! ```
24
25use std::collections::VecDeque;
26use std::sync::Mutex;
27
28use crate::codec::DRAIN_FRAME_OVERHEAD;
29
30// ---------------------------------------------------------------------------
31// Constants
32// ---------------------------------------------------------------------------
33
34/// Default capacity in bytes (64 KB).
35const DEFAULT_CAPACITY: usize = 64 * 1024;
36
37// ---------------------------------------------------------------------------
38// Inner
39// ---------------------------------------------------------------------------
40
41/// The unsynchronized interior of the ring buffer.
42struct Inner {
43 /// Buffered frames in FIFO order.
44 frames: VecDeque<Vec<u8>>,
45 /// Total bytes used: sum of (DRAIN_FRAME_OVERHEAD + frame.len()) for each frame.
46 bytes_used: usize,
47 /// Maximum byte budget.
48 capacity: usize,
49}
50
51impl Inner {
52 /// Create an empty inner buffer with the given byte budget.
53 fn new(capacity: usize) -> Self {
54 Self {
55 frames: VecDeque::new(),
56 bytes_used: 0,
57 capacity,
58 }
59 }
60
61 /// Cost of storing a single frame (length prefix + payload).
62 #[inline]
63 fn frame_cost(frame: &[u8]) -> usize {
64 DRAIN_FRAME_OVERHEAD + frame.len()
65 }
66
67 /// Drop the oldest frame, adjusting the byte counter. Returns `true` if
68 /// a frame was actually removed.
69 fn drop_oldest(&mut self) -> bool {
70 if let Some(old) = self.frames.pop_front() {
71 self.bytes_used -= Self::frame_cost(&old);
72 true
73 } else {
74 false
75 }
76 }
77}
78
79// ---------------------------------------------------------------------------
80// PushOutcome
81// ---------------------------------------------------------------------------
82
83/// Outcome of a [`RingBuffer::push`] operation.
84///
85/// Distinguishes between a frame being accepted (possibly with evictions)
86/// and a frame being discarded because it can never fit in the buffer.
87#[derive(Debug, Clone, Copy, PartialEq, Eq)]
88pub enum PushOutcome {
89 /// Frame was accepted. The `usize` is the number of older frames
90 /// that were evicted to make room (may be `0`).
91 Accepted(usize),
92 /// Frame was too large to ever fit in this buffer (even when empty)
93 /// and was silently discarded. No data was written.
94 TooLarge,
95}
96
97// ---------------------------------------------------------------------------
98// RingBuffer
99// ---------------------------------------------------------------------------
100
101/// Thread-safe, in-process circular buffer for streaming binary frames.
102///
103/// Frames are variable-length byte slices stored with a u32 LE length prefix.
104/// The buffer enforces a byte budget; when a push would exceed the budget the
105/// oldest frames are silently dropped (lossy back-pressure).
106///
107/// # Thread safety
108///
109/// All public methods take `&self` and synchronize via an internal [`Mutex`].
110/// Contention is expected to be low: typically one producer thread and one
111/// consumer (the custom protocol handler draining on a `fetch` call).
112pub struct RingBuffer {
113 inner: Mutex<Inner>,
114}
115
116impl RingBuffer {
117 /// Create a ring buffer with the given byte capacity.
118 ///
119 /// # Panics
120 ///
121 /// Panics if `capacity` is less than `DRAIN_FRAME_OVERHEAD + 1` (5 bytes),
122 /// since at least a 1-byte frame must be storable.
123 pub fn new(capacity: usize) -> Self {
124 assert!(
125 capacity > DRAIN_FRAME_OVERHEAD,
126 "capacity must be at least {} bytes (DRAIN_FRAME_OVERHEAD + 1)",
127 DRAIN_FRAME_OVERHEAD + 1,
128 );
129 Self {
130 inner: Mutex::new(Inner::new(capacity)),
131 }
132 }
133
134 /// Create a ring buffer with the default capacity (64 KB).
135 pub fn with_default_capacity() -> Self {
136 Self::new(DEFAULT_CAPACITY)
137 }
138
139 /// Push a frame into the buffer.
140 ///
141 /// If the frame (plus its 4-byte length prefix) would exceed the byte
142 /// budget, the oldest frames are dropped until there is room. Returns the
143 /// number of frames that were dropped to make space.
144 ///
145 /// If the frame itself is larger than the total capacity it is silently
146 /// discarded and the return value is `0`.
147 pub fn push(&self, frame: &[u8]) -> usize {
148 match self.push_checked(frame) {
149 PushOutcome::Accepted(n) => n,
150 PushOutcome::TooLarge => 0,
151 }
152 }
153
154 /// Push a frame with a richer outcome report.
155 ///
156 /// Like [`push`](Self::push), but returns [`PushOutcome::TooLarge`] when
157 /// the frame can never fit, instead of silently returning `0`.
158 #[must_use]
159 pub fn push_checked(&self, frame: &[u8]) -> PushOutcome {
160 let cost = Inner::frame_cost(frame);
161 let mut inner = crate::lock_or_recover(&self.inner);
162
163 // Frame too large for this buffer — discard it.
164 if cost > inner.capacity {
165 return PushOutcome::TooLarge;
166 }
167
168 let mut dropped = 0usize;
169 while inner.bytes_used + cost > inner.capacity {
170 if !inner.drop_oldest() {
171 break;
172 }
173 dropped += 1;
174 }
175
176 inner.frames.push_back(frame.to_vec());
177 inner.bytes_used += cost;
178 PushOutcome::Accepted(dropped)
179 }
180
181 /// Drain all buffered frames into a single binary blob and clear the
182 /// buffer.
183 ///
184 /// # Wire format
185 ///
186 /// ```text
187 /// [u32 LE frame_count]
188 /// [u32 LE len_1][bytes_1]
189 /// [u32 LE len_2][bytes_2]
190 /// ...
191 /// ```
192 ///
193 /// Returns an empty `Vec` if the buffer is empty.
194 #[must_use]
195 pub fn drain_all(&self) -> Vec<u8> {
196 // Swap the frames out under the lock, then serialize without contention.
197 let (mut frames, bytes_used) = {
198 let mut inner = crate::lock_or_recover(&self.inner);
199 if inner.frames.is_empty() {
200 return Vec::new();
201 }
202 let frames = std::mem::take(&mut inner.frames);
203 let bytes_used = inner.bytes_used;
204 inner.bytes_used = 0;
205 (frames, bytes_used)
206 };
207 // Lock released — serialize without holding the mutex.
208 let output_size = 4usize.saturating_add(bytes_used);
209 let mut buf = Vec::with_capacity(output_size);
210
211 // Frame count header.
212 let count = frames.len() as u32;
213 buf.extend_from_slice(&count.to_le_bytes());
214
215 // Each frame: [u32 LE len][bytes].
216 for frame in frames.make_contiguous() {
217 let len = frame.len() as u32;
218 buf.extend_from_slice(&len.to_le_bytes());
219 buf.extend_from_slice(frame);
220 }
221
222 buf
223 }
224
225 /// Read one frame from the front of the buffer (FIFO).
226 ///
227 /// Returns `None` if the buffer is empty.
228 #[must_use]
229 pub fn try_pop(&self) -> Option<Vec<u8>> {
230 let mut inner = crate::lock_or_recover(&self.inner);
231 let frame = inner.frames.pop_front()?;
232 inner.bytes_used -= Inner::frame_cost(&frame);
233 Some(frame)
234 }
235
236 /// Number of frames currently buffered.
237 #[must_use]
238 pub fn frame_count(&self) -> usize {
239 crate::lock_or_recover(&self.inner).frames.len()
240 }
241
242 /// Number of bytes currently used (including per-frame length prefixes).
243 #[must_use]
244 pub fn bytes_used(&self) -> usize {
245 crate::lock_or_recover(&self.inner).bytes_used
246 }
247
248 /// Total byte capacity of the buffer.
249 #[must_use]
250 pub fn capacity(&self) -> usize {
251 crate::lock_or_recover(&self.inner).capacity
252 }
253
254 /// Clear all buffered frames.
255 pub fn clear(&self) {
256 let mut inner = crate::lock_or_recover(&self.inner);
257 inner.frames.clear();
258 inner.bytes_used = 0;
259 }
260}
261
262impl std::fmt::Debug for RingBuffer {
263 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
264 let inner = crate::lock_or_recover(&self.inner);
265 f.debug_struct("RingBuffer")
266 .field("frame_count", &inner.frames.len())
267 .field("bytes_used", &inner.bytes_used)
268 .field("capacity", &inner.capacity)
269 .finish()
270 }
271}
272
273// ---------------------------------------------------------------------------
274// Tests
275// ---------------------------------------------------------------------------
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 #[test]
282 fn push_and_pop() {
283 let rb = RingBuffer::new(1024);
284 let _ = rb.push(b"alpha");
285 let _ = rb.push(b"beta");
286 let _ = rb.push(b"gamma");
287
288 assert_eq!(rb.frame_count(), 3);
289 assert_eq!(rb.try_pop().unwrap(), b"alpha");
290 assert_eq!(rb.try_pop().unwrap(), b"beta");
291 assert_eq!(rb.try_pop().unwrap(), b"gamma");
292 assert!(rb.try_pop().is_none());
293 }
294
295 #[test]
296 fn drain_all_format() {
297 let rb = RingBuffer::new(1024);
298 let _ = rb.push(b"hello");
299 let _ = rb.push(b"world");
300
301 let blob = rb.drain_all();
302
303 // Parse: [u32 count][u32 len][bytes]...
304 let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
305 assert_eq!(count, 2);
306
307 let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
308 assert_eq!(len1, 5);
309 assert_eq!(&blob[8..8 + len1], b"hello");
310
311 let offset2 = 8 + len1;
312 let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
313 assert_eq!(len2, 5);
314 assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
315
316 // Buffer should be empty now.
317 assert_eq!(rb.frame_count(), 0);
318 assert_eq!(rb.bytes_used(), 0);
319 }
320
321 #[test]
322 fn overflow_drops_oldest() {
323 // Capacity for exactly 2 frames of 4 bytes each:
324 // frame cost = 4 (overhead) + 4 (payload) = 8 bytes
325 // 2 frames = 16 bytes
326 let rb = RingBuffer::new(16);
327
328 let dropped = rb.push(b"aaaa"); // cost 8, total 8
329 assert_eq!(dropped, 0);
330
331 let dropped = rb.push(b"bbbb"); // cost 8, total 16
332 assert_eq!(dropped, 0);
333
334 // Third push must drop the oldest to fit.
335 let dropped = rb.push(b"cccc"); // drops "aaaa"
336 assert_eq!(dropped, 1);
337
338 assert_eq!(rb.frame_count(), 2);
339 assert_eq!(rb.try_pop().unwrap(), b"bbbb");
340 assert_eq!(rb.try_pop().unwrap(), b"cccc");
341 }
342
343 #[test]
344 fn empty_drain() {
345 let rb = RingBuffer::new(1024);
346 let blob = rb.drain_all();
347 assert!(blob.is_empty());
348 }
349
350 #[test]
351 fn frame_count_and_bytes() {
352 let rb = RingBuffer::new(1024);
353
354 assert_eq!(rb.frame_count(), 0);
355 assert_eq!(rb.bytes_used(), 0);
356 assert_eq!(rb.capacity(), 1024);
357
358 let _ = rb.push(b"abc"); // cost = 4 + 3 = 7
359 assert_eq!(rb.frame_count(), 1);
360 assert_eq!(rb.bytes_used(), 7);
361
362 let _ = rb.push(b"de"); // cost = 4 + 2 = 6
363 assert_eq!(rb.frame_count(), 2);
364 assert_eq!(rb.bytes_used(), 13);
365
366 let _ = rb.try_pop();
367 assert_eq!(rb.frame_count(), 1);
368 assert_eq!(rb.bytes_used(), 6);
369 }
370
371 #[test]
372 fn clear() {
373 let rb = RingBuffer::new(1024);
374 let _ = rb.push(b"one");
375 let _ = rb.push(b"two");
376 let _ = rb.push(b"three");
377
378 assert_eq!(rb.frame_count(), 3);
379 rb.clear();
380 assert_eq!(rb.frame_count(), 0);
381 assert_eq!(rb.bytes_used(), 0);
382 assert!(rb.try_pop().is_none());
383 }
384
385 #[tokio::test]
386 async fn concurrent_push_pop() {
387 use std::sync::Arc;
388
389 let rb = Arc::new(RingBuffer::new(64 * 1024));
390 let rb_producer = Arc::clone(&rb);
391 let rb_consumer = Arc::clone(&rb);
392
393 let producer = tokio::spawn(async move {
394 for i in 0u32..1000 {
395 let _ = rb_producer.push(&i.to_le_bytes());
396 }
397 });
398
399 let consumer = tokio::spawn(async move {
400 let mut popped = 0usize;
401 // Keep trying until the producer is done and the buffer is empty.
402 loop {
403 if let Some(_frame) = rb_consumer.try_pop() {
404 popped += 1;
405 } else {
406 // Yield to let the producer make progress.
407 tokio::task::yield_now().await;
408 }
409 // Safety valve: once we know the producer pushed 1000, stop
410 // when the buffer is empty.
411 if popped >= 1000 {
412 break;
413 }
414 }
415 popped
416 });
417
418 producer.await.unwrap();
419 // Drain whatever the consumer missed.
420 let consumer_popped = consumer.await.unwrap();
421
422 // Between the consumer and any remaining frames, we should account for
423 // all 1000 pushes (some may have been dropped due to timing, but with
424 // 64 KB capacity and 8 bytes per frame, nothing should be lost here).
425 let remaining = rb.frame_count();
426 assert_eq!(consumer_popped + remaining, 1000);
427 }
428
429 #[test]
430 fn single_large_frame() {
431 // Buffer capacity is 32 bytes. A frame of 100 bytes costs 104 bytes
432 // — larger than capacity. It should be silently discarded.
433 let rb = RingBuffer::new(32);
434 let _ = rb.push(b"ok"); // cost 6, fits
435 let dropped = rb.push(&[0xFFu8; 100]); // cost 104, too large
436 assert_eq!(dropped, 0); // not counted as "dropped oldest"
437
438 // The small frame should still be there.
439 assert_eq!(rb.frame_count(), 1);
440 assert_eq!(rb.try_pop().unwrap(), b"ok");
441 }
442
443 #[test]
444 fn drain_then_push() {
445 let rb = RingBuffer::new(1024);
446 let _ = rb.push(b"first");
447 let blob = rb.drain_all();
448 assert!(!blob.is_empty());
449
450 // Buffer is empty after drain; push more.
451 let _ = rb.push(b"second");
452 assert_eq!(rb.frame_count(), 1);
453 assert_eq!(rb.try_pop().unwrap(), b"second");
454 }
455
456 #[test]
457 fn overflow_cascade() {
458 // Capacity for exactly one 4-byte frame (cost = 8).
459 let rb = RingBuffer::new(8);
460
461 let _ = rb.push(b"aaaa"); // cost 8, fills completely
462 assert_eq!(rb.frame_count(), 1);
463
464 // Push a larger frame (6 bytes, cost 10 > 8) — too large for buffer.
465 let dropped = rb.push(&[0u8; 6]);
466 // The frame cannot fit even in an empty buffer, so it's discarded.
467 assert_eq!(dropped, 0);
468
469 // Original frame should still be intact.
470 assert_eq!(rb.frame_count(), 1);
471 assert_eq!(rb.try_pop().unwrap(), b"aaaa");
472 }
473
474 #[test]
475 #[should_panic(expected = "capacity must be at least 5 bytes")]
476 fn tiny_capacity_panics() {
477 RingBuffer::new(4); // equal to DRAIN_FRAME_OVERHEAD, but less than DRAIN_FRAME_OVERHEAD + 1
478 }
479
480 #[test]
481 fn with_default_capacity() {
482 let rb = RingBuffer::with_default_capacity();
483 assert_eq!(rb.capacity(), 64 * 1024);
484 }
485}