conduit_core/ringbuf.rs
1//! In-process ring buffer for high-frequency streaming.
2//!
3//! [`RingBuffer`] is the breakthrough component of tauri-conduit: an
4//! in-process circular buffer that lets the Rust backend stream binary frames
5//! to the WebView frontend without serialization, IPC, or inter-process shared
6//! memory. The custom protocol handler (`conduit://`) reads directly from it.
7//!
8//! # Design
9//!
10//! The buffer stores variable-length frames with a configurable byte budget
11//! (default 64 KB). When the budget is exceeded, the oldest frames are dropped
12//! to make room — this is lossy by design, because the JS consumer is expected
13//! to drain fast enough for real-time use cases (market data, sensor telemetry,
14//! audio buffers).
15//!
16//! # Wire format (`drain_all`)
17//!
18//! ```text
19//! [u32 LE frame_count]
20//! [u32 LE len_1][bytes_1]
21//! [u32 LE len_2][bytes_2]
22//! ...
23//! ```
24
25use std::sync::Mutex;
26
27use crate::codec::DRAIN_FRAME_OVERHEAD;
28
29// ---------------------------------------------------------------------------
30// Constants
31// ---------------------------------------------------------------------------
32
33/// Default capacity in bytes (64 KB).
34const DEFAULT_CAPACITY: usize = 64 * 1024;
35
36// ---------------------------------------------------------------------------
37// Inner
38// ---------------------------------------------------------------------------
39
40/// The unsynchronized interior of the ring buffer.
41///
42/// Frames are stored pre-formatted in wire layout: `[u32 LE len][bytes]` per
43/// frame, so that `drain_all()` can emit the entire payload with a single
44/// memcpy instead of N×2 `extend_from_slice` calls.
45struct Inner {
46 /// Pre-formatted wire data: frames stored as [u32 LE len][bytes][u32 LE len][bytes]...
47 wire_data: Vec<u8>,
48 /// Number of frames currently stored.
49 frame_count: u32,
50 /// Start of live data in wire_data (frames before this offset have been evicted).
51 read_pos: usize,
52 /// Total bytes used for capacity accounting: sum of (DRAIN_FRAME_OVERHEAD + frame.len()).
53 bytes_used: usize,
54 /// Maximum byte budget.
55 capacity: usize,
56}
57
58impl Inner {
59 /// Create an empty inner buffer with the given byte budget.
60 fn new(capacity: usize) -> Self {
61 Self {
62 wire_data: Vec::new(),
63 frame_count: 0,
64 read_pos: 0,
65 bytes_used: 0,
66 capacity,
67 }
68 }
69
70 /// Cost of storing a single frame (length prefix + payload).
71 #[inline]
72 fn frame_cost(frame: &[u8]) -> usize {
73 DRAIN_FRAME_OVERHEAD + frame.len()
74 }
75
76 /// Drop the oldest frame by advancing `read_pos`. Returns `true` if
77 /// a frame was actually removed.
78 fn drop_oldest(&mut self) -> bool {
79 if self.frame_count == 0 {
80 return false;
81 }
82 let len_bytes: [u8; 4] = self.wire_data[self.read_pos..self.read_pos + 4]
83 .try_into()
84 .unwrap();
85 let payload_len = u32::from_le_bytes(len_bytes) as usize;
86 let cost = DRAIN_FRAME_OVERHEAD + payload_len;
87 self.read_pos += cost;
88 self.frame_count -= 1;
89 self.bytes_used -= cost;
90 true
91 }
92}
93
94// ---------------------------------------------------------------------------
95// PushOutcome
96// ---------------------------------------------------------------------------
97
98/// Outcome of a [`RingBuffer::push`] operation.
99///
100/// Distinguishes between a frame being accepted (possibly with evictions)
101/// and a frame being discarded because it can never fit in the buffer.
102#[derive(Debug, Clone, Copy, PartialEq, Eq)]
103pub enum PushOutcome {
104 /// Frame was accepted. The `usize` is the number of older frames
105 /// that were evicted to make room (may be `0`).
106 Accepted(usize),
107 /// Frame was too large to ever fit in this buffer (even when empty)
108 /// and was silently discarded. No data was written.
109 TooLarge,
110}
111
112// ---------------------------------------------------------------------------
113// RingBuffer
114// ---------------------------------------------------------------------------
115
116/// Thread-safe, in-process circular buffer for streaming binary frames.
117///
118/// Frames are variable-length byte slices stored with a u32 LE length prefix.
119/// The buffer enforces a byte budget; when a push would exceed the budget the
120/// oldest frames are silently dropped (lossy back-pressure).
121///
122/// # Thread safety
123///
124/// All public methods take `&self` and synchronize via an internal [`Mutex`].
125/// Contention is expected to be low: typically one producer thread and one
126/// consumer (the custom protocol handler draining on a `fetch` call).
127pub struct RingBuffer {
128 inner: Mutex<Inner>,
129}
130
131impl RingBuffer {
132 /// Create a ring buffer with the given byte capacity.
133 ///
134 /// # Panics
135 ///
136 /// Panics if `capacity` is less than `DRAIN_FRAME_OVERHEAD + 1` (5 bytes),
137 /// since at least a 1-byte frame must be storable.
138 pub fn new(capacity: usize) -> Self {
139 assert!(
140 capacity > DRAIN_FRAME_OVERHEAD,
141 "capacity must be at least {} bytes (DRAIN_FRAME_OVERHEAD + 1)",
142 DRAIN_FRAME_OVERHEAD + 1,
143 );
144 Self {
145 inner: Mutex::new(Inner::new(capacity)),
146 }
147 }
148
149 /// Create a ring buffer with the default capacity (64 KB).
150 pub fn with_default_capacity() -> Self {
151 Self::new(DEFAULT_CAPACITY)
152 }
153
154 /// Push a frame into the buffer.
155 ///
156 /// If the frame (plus its 4-byte length prefix) would exceed the byte
157 /// budget, the oldest frames are dropped until there is room. Returns the
158 /// number of frames that were dropped to make space.
159 ///
160 /// If the frame itself is larger than the total capacity it is silently
161 /// discarded and the return value is `0`.
162 pub fn push(&self, frame: &[u8]) -> usize {
163 match self.push_checked(frame) {
164 PushOutcome::Accepted(n) => n,
165 PushOutcome::TooLarge => 0,
166 }
167 }
168
169 /// Push a frame with a richer outcome report.
170 ///
171 /// Like [`push`](Self::push), but returns [`PushOutcome::TooLarge`] when
172 /// the frame can never fit, instead of silently returning `0`.
173 #[must_use]
174 pub fn push_checked(&self, frame: &[u8]) -> PushOutcome {
175 // Guard: frame length must fit in u32 (wire format invariant) and
176 // frame_cost must not overflow usize (relevant on 32-bit targets).
177 if frame.len() > u32::MAX as usize
178 || DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none()
179 {
180 return PushOutcome::TooLarge;
181 }
182
183 let cost = Inner::frame_cost(frame);
184 let mut inner = crate::lock_or_recover(&self.inner);
185
186 // Frame too large for this buffer — discard it.
187 if cost > inner.capacity {
188 return PushOutcome::TooLarge;
189 }
190
191 let mut dropped = 0usize;
192 while inner.bytes_used + cost > inner.capacity {
193 if !inner.drop_oldest() {
194 break;
195 }
196 dropped += 1;
197 }
198
199 // Compact if read_pos is more than half the allocated buffer.
200 if inner.read_pos > 0 && inner.read_pos > inner.wire_data.len() / 2 {
201 let rp = inner.read_pos;
202 inner.wire_data.copy_within(rp.., 0);
203 let new_len = inner.wire_data.len() - rp;
204 inner.wire_data.truncate(new_len);
205 inner.read_pos = 0;
206 }
207
208 // Guard: frame count must fit in u32 (wire format uses u32 count header).
209 if inner.frame_count == u32::MAX {
210 return PushOutcome::TooLarge;
211 }
212
213 // Append frame in wire format: [u32 LE len][bytes].
214 inner
215 .wire_data
216 .extend_from_slice(&(frame.len() as u32).to_le_bytes());
217 inner.wire_data.extend_from_slice(frame);
218 inner.frame_count += 1;
219 inner.bytes_used += cost;
220 PushOutcome::Accepted(dropped)
221 }
222
223 /// Drain all buffered frames into a single binary blob and clear the
224 /// buffer.
225 ///
226 /// # Wire format
227 ///
228 /// ```text
229 /// [u32 LE frame_count]
230 /// [u32 LE len_1][bytes_1]
231 /// [u32 LE len_2][bytes_2]
232 /// ...
233 /// ```
234 ///
235 /// Returns an empty `Vec` if the buffer is empty.
236 #[must_use]
237 pub fn drain_all(&self) -> Vec<u8> {
238 // Take the pre-formatted wire data out under the lock, then prepend
239 // the frame count header without contention.
240 let (wire_data, read_pos, frame_count) = {
241 let mut inner = crate::lock_or_recover(&self.inner);
242 if inner.frame_count == 0 {
243 return Vec::new();
244 }
245 let wire_data = std::mem::take(&mut inner.wire_data);
246 let read_pos = inner.read_pos;
247 let frame_count = inner.frame_count;
248 inner.read_pos = 0;
249 inner.frame_count = 0;
250 inner.bytes_used = 0;
251 (wire_data, read_pos, frame_count)
252 };
253 // Lock released — build output with TWO extend_from_slice calls (was N×2).
254 let live_data = &wire_data[read_pos..];
255 let output_size = 4 + live_data.len();
256 let mut buf = Vec::with_capacity(output_size);
257 buf.extend_from_slice(&frame_count.to_le_bytes());
258 buf.extend_from_slice(live_data);
259 buf
260 }
261
262 /// Read one frame from the front of the buffer (FIFO).
263 ///
264 /// Returns `None` if the buffer is empty.
265 #[must_use]
266 pub fn try_pop(&self) -> Option<Vec<u8>> {
267 let mut inner = crate::lock_or_recover(&self.inner);
268 if inner.frame_count == 0 {
269 return None;
270 }
271 let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4]
272 .try_into()
273 .unwrap();
274 let payload_len = u32::from_le_bytes(len_bytes) as usize;
275 let payload_start = inner.read_pos + 4;
276 let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec();
277 let cost = DRAIN_FRAME_OVERHEAD + payload_len;
278 inner.read_pos += cost;
279 inner.frame_count -= 1;
280 inner.bytes_used -= cost;
281
282 // Compact when empty.
283 if inner.frame_count == 0 {
284 inner.wire_data.clear();
285 inner.read_pos = 0;
286 }
287
288 Some(frame)
289 }
290
291 /// Number of frames currently buffered.
292 #[must_use]
293 pub fn frame_count(&self) -> usize {
294 crate::lock_or_recover(&self.inner).frame_count as usize
295 }
296
297 /// Number of bytes currently used (including per-frame length prefixes).
298 #[must_use]
299 pub fn bytes_used(&self) -> usize {
300 crate::lock_or_recover(&self.inner).bytes_used
301 }
302
303 /// Total byte capacity of the buffer.
304 #[must_use]
305 pub fn capacity(&self) -> usize {
306 crate::lock_or_recover(&self.inner).capacity
307 }
308
309 /// Clear all buffered frames.
310 pub fn clear(&self) {
311 let mut inner = crate::lock_or_recover(&self.inner);
312 inner.wire_data.clear();
313 inner.frame_count = 0;
314 inner.read_pos = 0;
315 inner.bytes_used = 0;
316 }
317}
318
319impl std::fmt::Debug for RingBuffer {
320 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
321 let inner = crate::lock_or_recover(&self.inner);
322 f.debug_struct("RingBuffer")
323 .field("frame_count", &inner.frame_count)
324 .field("bytes_used", &inner.bytes_used)
325 .field("capacity", &inner.capacity)
326 .finish()
327 }
328}
329
330// ---------------------------------------------------------------------------
331// Tests
332// ---------------------------------------------------------------------------
333
334#[cfg(test)]
335mod tests {
336 use super::*;
337
338 #[test]
339 fn push_and_pop() {
340 let rb = RingBuffer::new(1024);
341 let _ = rb.push(b"alpha");
342 let _ = rb.push(b"beta");
343 let _ = rb.push(b"gamma");
344
345 assert_eq!(rb.frame_count(), 3);
346 assert_eq!(rb.try_pop().unwrap(), b"alpha");
347 assert_eq!(rb.try_pop().unwrap(), b"beta");
348 assert_eq!(rb.try_pop().unwrap(), b"gamma");
349 assert!(rb.try_pop().is_none());
350 }
351
352 #[test]
353 fn drain_all_format() {
354 let rb = RingBuffer::new(1024);
355 let _ = rb.push(b"hello");
356 let _ = rb.push(b"world");
357
358 let blob = rb.drain_all();
359
360 // Parse: [u32 count][u32 len][bytes]...
361 let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
362 assert_eq!(count, 2);
363
364 let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
365 assert_eq!(len1, 5);
366 assert_eq!(&blob[8..8 + len1], b"hello");
367
368 let offset2 = 8 + len1;
369 let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
370 assert_eq!(len2, 5);
371 assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
372
373 // Buffer should be empty now.
374 assert_eq!(rb.frame_count(), 0);
375 assert_eq!(rb.bytes_used(), 0);
376 }
377
378 #[test]
379 fn overflow_drops_oldest() {
380 // Capacity for exactly 2 frames of 4 bytes each:
381 // frame cost = 4 (overhead) + 4 (payload) = 8 bytes
382 // 2 frames = 16 bytes
383 let rb = RingBuffer::new(16);
384
385 let dropped = rb.push(b"aaaa"); // cost 8, total 8
386 assert_eq!(dropped, 0);
387
388 let dropped = rb.push(b"bbbb"); // cost 8, total 16
389 assert_eq!(dropped, 0);
390
391 // Third push must drop the oldest to fit.
392 let dropped = rb.push(b"cccc"); // drops "aaaa"
393 assert_eq!(dropped, 1);
394
395 assert_eq!(rb.frame_count(), 2);
396 assert_eq!(rb.try_pop().unwrap(), b"bbbb");
397 assert_eq!(rb.try_pop().unwrap(), b"cccc");
398 }
399
400 #[test]
401 fn empty_drain() {
402 let rb = RingBuffer::new(1024);
403 let blob = rb.drain_all();
404 assert!(blob.is_empty());
405 }
406
407 #[test]
408 fn frame_count_and_bytes() {
409 let rb = RingBuffer::new(1024);
410
411 assert_eq!(rb.frame_count(), 0);
412 assert_eq!(rb.bytes_used(), 0);
413 assert_eq!(rb.capacity(), 1024);
414
415 let _ = rb.push(b"abc"); // cost = 4 + 3 = 7
416 assert_eq!(rb.frame_count(), 1);
417 assert_eq!(rb.bytes_used(), 7);
418
419 let _ = rb.push(b"de"); // cost = 4 + 2 = 6
420 assert_eq!(rb.frame_count(), 2);
421 assert_eq!(rb.bytes_used(), 13);
422
423 let _ = rb.try_pop();
424 assert_eq!(rb.frame_count(), 1);
425 assert_eq!(rb.bytes_used(), 6);
426 }
427
428 #[test]
429 fn clear() {
430 let rb = RingBuffer::new(1024);
431 let _ = rb.push(b"one");
432 let _ = rb.push(b"two");
433 let _ = rb.push(b"three");
434
435 assert_eq!(rb.frame_count(), 3);
436 rb.clear();
437 assert_eq!(rb.frame_count(), 0);
438 assert_eq!(rb.bytes_used(), 0);
439 assert!(rb.try_pop().is_none());
440 }
441
442 #[tokio::test]
443 async fn concurrent_push_pop() {
444 use std::sync::Arc;
445
446 let rb = Arc::new(RingBuffer::new(64 * 1024));
447 let rb_producer = Arc::clone(&rb);
448 let rb_consumer = Arc::clone(&rb);
449
450 let producer = tokio::spawn(async move {
451 for i in 0u32..1000 {
452 let _ = rb_producer.push(&i.to_le_bytes());
453 }
454 });
455
456 let consumer = tokio::spawn(async move {
457 let mut popped = 0usize;
458 // Keep trying until the producer is done and the buffer is empty.
459 loop {
460 if let Some(_frame) = rb_consumer.try_pop() {
461 popped += 1;
462 } else {
463 // Yield to let the producer make progress.
464 tokio::task::yield_now().await;
465 }
466 // Safety valve: once we know the producer pushed 1000, stop
467 // when the buffer is empty.
468 if popped >= 1000 {
469 break;
470 }
471 }
472 popped
473 });
474
475 producer.await.unwrap();
476 // Drain whatever the consumer missed.
477 let consumer_popped = consumer.await.unwrap();
478
479 // Between the consumer and any remaining frames, we should account for
480 // all 1000 pushes (some may have been dropped due to timing, but with
481 // 64 KB capacity and 8 bytes per frame, nothing should be lost here).
482 let remaining = rb.frame_count();
483 assert_eq!(consumer_popped + remaining, 1000);
484 }
485
486 #[test]
487 fn single_large_frame() {
488 // Buffer capacity is 32 bytes. A frame of 100 bytes costs 104 bytes
489 // — larger than capacity. It should be silently discarded.
490 let rb = RingBuffer::new(32);
491 let _ = rb.push(b"ok"); // cost 6, fits
492 let dropped = rb.push(&[0xFFu8; 100]); // cost 104, too large
493 assert_eq!(dropped, 0); // not counted as "dropped oldest"
494
495 // The small frame should still be there.
496 assert_eq!(rb.frame_count(), 1);
497 assert_eq!(rb.try_pop().unwrap(), b"ok");
498 }
499
500 #[test]
501 fn drain_then_push() {
502 let rb = RingBuffer::new(1024);
503 let _ = rb.push(b"first");
504 let blob = rb.drain_all();
505 assert!(!blob.is_empty());
506
507 // Buffer is empty after drain; push more.
508 let _ = rb.push(b"second");
509 assert_eq!(rb.frame_count(), 1);
510 assert_eq!(rb.try_pop().unwrap(), b"second");
511 }
512
513 #[test]
514 fn overflow_cascade() {
515 // Capacity for exactly one 4-byte frame (cost = 8).
516 let rb = RingBuffer::new(8);
517
518 let _ = rb.push(b"aaaa"); // cost 8, fills completely
519 assert_eq!(rb.frame_count(), 1);
520
521 // Push a larger frame (6 bytes, cost 10 > 8) — too large for buffer.
522 let dropped = rb.push(&[0u8; 6]);
523 // The frame cannot fit even in an empty buffer, so it's discarded.
524 assert_eq!(dropped, 0);
525
526 // Original frame should still be intact.
527 assert_eq!(rb.frame_count(), 1);
528 assert_eq!(rb.try_pop().unwrap(), b"aaaa");
529 }
530
531 #[test]
532 #[should_panic(expected = "capacity must be at least 5 bytes")]
533 fn tiny_capacity_panics() {
534 RingBuffer::new(4); // equal to DRAIN_FRAME_OVERHEAD, but less than DRAIN_FRAME_OVERHEAD + 1
535 }
536
537 #[test]
538 fn with_default_capacity() {
539 let rb = RingBuffer::with_default_capacity();
540 assert_eq!(rb.capacity(), 64 * 1024);
541 }
542}