Skip to main content

conduit_core/
queue.rs

1//! Bounded frame queue with guaranteed delivery.
2//!
3//! [`Queue`] is the counterpart to [`crate::RingBuffer`]: instead of silently
4//! dropping the oldest frames when the byte budget is exceeded, it rejects new
5//! pushes with [`crate::Error::ChannelFull`]. This makes it suitable for
6//! channels where every frame must be delivered (e.g. control messages,
7//! transaction logs).
8//!
9//! # Wire format (`drain_all`)
10//!
11//! The wire format is identical to [`crate::RingBuffer::drain_all`]:
12//!
13//! ```text
14//! [u32 LE frame_count]
15//! [u32 LE len_1][bytes_1]
16//! [u32 LE len_2][bytes_2]
17//! ...
18//! ```
19
20use std::sync::Mutex;
21
22use crate::Error;
23use crate::codec::DRAIN_FRAME_OVERHEAD;
24
25// ---------------------------------------------------------------------------
26// Inner
27// ---------------------------------------------------------------------------
28
29/// The unsynchronized interior of the queue.
30///
31/// Frames are stored pre-formatted in wire layout: `[u32 LE len][bytes]` per
32/// frame, so that `drain_all()` can emit the entire payload with a single
33/// memcpy instead of N×2 `extend_from_slice` calls.
34struct QueueInner {
35    /// Pre-formatted wire data: frames stored as [u32 LE len][bytes][u32 LE len][bytes]...
36    wire_data: Vec<u8>,
37    /// Number of frames currently stored.
38    frame_count: u32,
39    /// Start of live data in wire_data (frames before this offset have been popped).
40    read_pos: usize,
41    /// Total bytes used for capacity accounting: sum of (DRAIN_FRAME_OVERHEAD + frame.len()).
42    bytes_used: usize,
43    /// Maximum byte budget. `0` means unbounded.
44    max_bytes: usize,
45}
46
47impl QueueInner {
48    /// Create an empty inner buffer with the given byte budget.
49    fn new(max_bytes: usize) -> Self {
50        Self {
51            wire_data: Vec::new(),
52            frame_count: 0,
53            read_pos: 0,
54            bytes_used: 0,
55            max_bytes,
56        }
57    }
58
59    /// Cost of storing a single frame (length prefix + payload).
60    #[inline]
61    fn frame_cost(frame: &[u8]) -> usize {
62        DRAIN_FRAME_OVERHEAD + frame.len()
63    }
64}
65
66// ---------------------------------------------------------------------------
67// Queue
68// ---------------------------------------------------------------------------
69
70/// Thread-safe, bounded FIFO queue with backpressure.
71///
72/// Unlike [`crate::RingBuffer`], this queue never drops frames. When the byte
73/// budget would be exceeded, [`push`](Self::push) returns
74/// [`Error::ChannelFull`](crate::Error::ChannelFull) and the frame is rejected.
75/// A `max_bytes` of `0` means the queue is unbounded.
76///
77/// # Thread safety
78///
79/// All public methods take `&self` and synchronize via an internal [`Mutex`].
80pub struct Queue {
81    inner: Mutex<QueueInner>,
82}
83
84impl Queue {
85    /// Create a queue with the given byte limit.
86    ///
87    /// A `max_bytes` of `0` means unbounded — pushes will never fail due to
88    /// capacity.
89    ///
90    /// # Warning
91    ///
92    /// When `max_bytes` is `0` the queue has **no memory limit**. If the
93    /// producer pushes data faster than the consumer drains it, memory usage
94    /// will grow without bound, eventually causing an out-of-memory (OOM)
95    /// condition. Prefer a non-zero byte limit for production use and reserve
96    /// `0` (or [`Queue::unbounded`]) for cases where unbounded growth is
97    /// explicitly acceptable.
98    pub fn new(max_bytes: usize) -> Self {
99        Self {
100            inner: Mutex::new(QueueInner::new(max_bytes)),
101        }
102    }
103
104    /// Create an unbounded queue.
105    ///
106    /// Equivalent to `Queue::new(0)`.
107    ///
108    /// # Warning
109    ///
110    /// An unbounded queue has no memory limit. If the consumer cannot keep up
111    /// with the producer, memory usage will grow without bound. Prefer
112    /// [`Queue::new`] with a reasonable byte limit for production use.
113    pub fn unbounded() -> Self {
114        Self::new(0)
115    }
116
117    /// Push a frame into the queue.
118    ///
119    /// Returns `Ok(())` if the frame was accepted. Returns
120    /// [`Err(Error::ChannelFull)`](crate::Error::ChannelFull) if the frame
121    /// (plus its 4-byte length prefix) would exceed `max_bytes`. When
122    /// `max_bytes` is `0` (unbounded), pushes always succeed.
123    pub fn push(&self, frame: &[u8]) -> Result<(), Error> {
124        // Guard: frame length must fit in u32 (wire format invariant) and
125        // frame_cost must not overflow usize (relevant on 32-bit targets).
126        if frame.len() > u32::MAX as usize
127            || DRAIN_FRAME_OVERHEAD.checked_add(frame.len()).is_none()
128        {
129            return Err(Error::PayloadTooLarge(frame.len()));
130        }
131
132        let cost = QueueInner::frame_cost(frame);
133        let mut inner = crate::lock_or_recover(&self.inner);
134
135        if inner.max_bytes > 0 && inner.bytes_used + cost > inner.max_bytes {
136            return Err(Error::ChannelFull);
137        }
138
139        // Guard: frame count must fit in u32 (wire format uses u32 count header).
140        if inner.frame_count == u32::MAX {
141            return Err(Error::ChannelFull);
142        }
143
144        // Append frame in wire format: [u32 LE len][bytes].
145        inner
146            .wire_data
147            .extend_from_slice(&(frame.len() as u32).to_le_bytes());
148        inner.wire_data.extend_from_slice(frame);
149        inner.frame_count += 1;
150        inner.bytes_used = inner.bytes_used.saturating_add(cost);
151        Ok(())
152    }
153
154    /// Read one frame from the front of the queue (FIFO).
155    ///
156    /// Returns `None` if the queue is empty.
157    #[must_use]
158    pub fn try_pop(&self) -> Option<Vec<u8>> {
159        let mut inner = crate::lock_or_recover(&self.inner);
160        if inner.frame_count == 0 {
161            return None;
162        }
163        let len_bytes: [u8; 4] = inner.wire_data[inner.read_pos..inner.read_pos + 4]
164            .try_into()
165            .unwrap();
166        let payload_len = u32::from_le_bytes(len_bytes) as usize;
167        let payload_start = inner.read_pos + 4;
168        let frame = inner.wire_data[payload_start..payload_start + payload_len].to_vec();
169        let cost = DRAIN_FRAME_OVERHEAD + payload_len;
170        inner.read_pos += cost;
171        inner.frame_count -= 1;
172        inner.bytes_used -= cost;
173
174        // Compact: when empty, just reset; otherwise shift when read_pos
175        // exceeds half the allocation to prevent unbounded growth during
176        // steady pop/push workloads.
177        if inner.frame_count == 0 {
178            inner.wire_data.clear();
179            inner.read_pos = 0;
180        } else if inner.read_pos > inner.wire_data.len() / 2 {
181            let rp = inner.read_pos;
182            inner.wire_data.copy_within(rp.., 0);
183            let new_len = inner.wire_data.len() - rp;
184            inner.wire_data.truncate(new_len);
185            inner.read_pos = 0;
186        }
187
188        Some(frame)
189    }
190
191    /// Drain all queued frames into a single binary blob and clear the queue.
192    ///
193    /// # Wire format
194    ///
195    /// ```text
196    /// [u32 LE frame_count]
197    /// [u32 LE len_1][bytes_1]
198    /// [u32 LE len_2][bytes_2]
199    /// ...
200    /// ```
201    ///
202    /// Returns an empty `Vec` if the queue is empty.
203    #[must_use]
204    pub fn drain_all(&self) -> Vec<u8> {
205        // Take the pre-formatted wire data out under the lock, then prepend
206        // the frame count header without contention.
207        let (wire_data, read_pos, frame_count) = {
208            let mut inner = crate::lock_or_recover(&self.inner);
209            if inner.frame_count == 0 {
210                return Vec::new();
211            }
212            let wire_data = std::mem::take(&mut inner.wire_data);
213            let read_pos = inner.read_pos;
214            let frame_count = inner.frame_count;
215            inner.read_pos = 0;
216            inner.frame_count = 0;
217            inner.bytes_used = 0;
218            (wire_data, read_pos, frame_count)
219        };
220        // Lock released — build output with TWO extend_from_slice calls (was N×2).
221        let live_data = &wire_data[read_pos..];
222        let output_size = 4 + live_data.len();
223        let mut buf = Vec::with_capacity(output_size);
224        buf.extend_from_slice(&frame_count.to_le_bytes());
225        buf.extend_from_slice(live_data);
226        buf
227    }
228
229    /// Number of frames currently queued.
230    #[must_use]
231    pub fn frame_count(&self) -> usize {
232        crate::lock_or_recover(&self.inner).frame_count as usize
233    }
234
235    /// Number of bytes currently used (including per-frame length prefixes).
236    #[must_use]
237    pub fn bytes_used(&self) -> usize {
238        crate::lock_or_recover(&self.inner).bytes_used
239    }
240
241    /// Maximum byte budget (`0` means unbounded).
242    #[must_use]
243    pub fn max_bytes(&self) -> usize {
244        crate::lock_or_recover(&self.inner).max_bytes
245    }
246
247    /// Clear all queued frames.
248    pub fn clear(&self) {
249        let mut inner = crate::lock_or_recover(&self.inner);
250        inner.wire_data.clear();
251        inner.frame_count = 0;
252        inner.read_pos = 0;
253        inner.bytes_used = 0;
254    }
255}
256
257impl std::fmt::Debug for Queue {
258    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
259        let inner = crate::lock_or_recover(&self.inner);
260        f.debug_struct("Queue")
261            .field("frame_count", &inner.frame_count)
262            .field("bytes_used", &inner.bytes_used)
263            .field("max_bytes", &inner.max_bytes)
264            .finish()
265    }
266}
267
268// ---------------------------------------------------------------------------
269// Tests
270// ---------------------------------------------------------------------------
271
272#[cfg(test)]
273mod tests {
274    use super::*;
275
276    #[test]
277    fn push_and_pop() {
278        let q = Queue::new(1024);
279        q.push(b"alpha").unwrap();
280        q.push(b"beta").unwrap();
281        q.push(b"gamma").unwrap();
282
283        assert_eq!(q.frame_count(), 3);
284        assert_eq!(q.try_pop().unwrap(), b"alpha");
285        assert_eq!(q.try_pop().unwrap(), b"beta");
286        assert_eq!(q.try_pop().unwrap(), b"gamma");
287        assert!(q.try_pop().is_none());
288    }
289
290    #[test]
291    fn push_within_limit() {
292        // Frame cost = 4 (overhead) + 4 (payload) = 8 bytes.
293        // Two frames = 16 bytes, capacity = 16.
294        let q = Queue::new(16);
295        q.push(b"aaaa").unwrap(); // cost 8, total 8
296        q.push(b"bbbb").unwrap(); // cost 8, total 16
297        assert_eq!(q.frame_count(), 2);
298        assert_eq!(q.bytes_used(), 16);
299    }
300
301    #[test]
302    fn push_exceeds_limit() {
303        // Capacity for exactly 2 frames of 4 bytes.
304        let q = Queue::new(16);
305        q.push(b"aaaa").unwrap(); // cost 8
306        q.push(b"bbbb").unwrap(); // cost 8, total 16
307
308        // Third push should fail.
309        let err = q.push(b"cccc").unwrap_err();
310        assert!(matches!(err, Error::ChannelFull));
311        assert_eq!(err.to_string(), "channel full: byte limit reached");
312
313        // Original frames are intact.
314        assert_eq!(q.frame_count(), 2);
315        assert_eq!(q.try_pop().unwrap(), b"aaaa");
316        assert_eq!(q.try_pop().unwrap(), b"bbbb");
317    }
318
319    #[test]
320    fn drain_all_format() {
321        let q = Queue::new(1024);
322        q.push(b"hello").unwrap();
323        q.push(b"world").unwrap();
324
325        let blob = q.drain_all();
326
327        // Parse: [u32 count][u32 len][bytes]...
328        let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
329        assert_eq!(count, 2);
330
331        let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
332        assert_eq!(len1, 5);
333        assert_eq!(&blob[8..8 + len1], b"hello");
334
335        let offset2 = 8 + len1;
336        let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
337        assert_eq!(len2, 5);
338        assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
339
340        // Queue should be empty now.
341        assert_eq!(q.frame_count(), 0);
342        assert_eq!(q.bytes_used(), 0);
343    }
344
345    #[test]
346    fn drain_frees_capacity() {
347        let q = Queue::new(16);
348        q.push(b"aaaa").unwrap(); // cost 8
349        q.push(b"bbbb").unwrap(); // cost 8, total 16
350
351        // Queue is full.
352        assert!(q.push(b"cccc").is_err());
353
354        // Drain frees everything.
355        let blob = q.drain_all();
356        assert!(!blob.is_empty());
357        assert_eq!(q.bytes_used(), 0);
358
359        // Now pushes succeed again.
360        q.push(b"dddd").unwrap();
361        q.push(b"eeee").unwrap();
362        assert_eq!(q.frame_count(), 2);
363    }
364
365    #[test]
366    fn unbounded_mode() {
367        let q = Queue::unbounded();
368        assert_eq!(q.max_bytes(), 0);
369
370        // Push a large number of frames — should never fail.
371        for i in 0u32..10_000 {
372            q.push(&i.to_le_bytes()).unwrap();
373        }
374        assert_eq!(q.frame_count(), 10_000);
375    }
376
377    #[test]
378    fn frame_count_and_bytes() {
379        let q = Queue::new(1024);
380
381        assert_eq!(q.frame_count(), 0);
382        assert_eq!(q.bytes_used(), 0);
383        assert_eq!(q.max_bytes(), 1024);
384
385        q.push(b"abc").unwrap(); // cost = 4 + 3 = 7
386        assert_eq!(q.frame_count(), 1);
387        assert_eq!(q.bytes_used(), 7);
388
389        q.push(b"de").unwrap(); // cost = 4 + 2 = 6
390        assert_eq!(q.frame_count(), 2);
391        assert_eq!(q.bytes_used(), 13);
392
393        let _ = q.try_pop();
394        assert_eq!(q.frame_count(), 1);
395        assert_eq!(q.bytes_used(), 6);
396    }
397
398    #[test]
399    fn clear() {
400        let q = Queue::new(1024);
401        q.push(b"one").unwrap();
402        q.push(b"two").unwrap();
403        q.push(b"three").unwrap();
404
405        assert_eq!(q.frame_count(), 3);
406        q.clear();
407        assert_eq!(q.frame_count(), 0);
408        assert_eq!(q.bytes_used(), 0);
409        assert!(q.try_pop().is_none());
410    }
411
412    #[test]
413    fn concurrent_push_pop() {
414        use std::sync::Arc;
415
416        let q = Arc::new(Queue::unbounded());
417        let q_producer = Arc::clone(&q);
418        let q_consumer = Arc::clone(&q);
419
420        let producer = std::thread::spawn(move || {
421            for i in 0u32..1000 {
422                q_producer.push(&i.to_le_bytes()).unwrap();
423            }
424        });
425
426        let consumer = std::thread::spawn(move || {
427            let mut popped = 0usize;
428            loop {
429                if q_consumer.try_pop().is_some() {
430                    popped += 1;
431                }
432                if popped >= 1000 {
433                    break;
434                }
435                // Yield to let the producer make progress.
436                std::thread::yield_now();
437            }
438            popped
439        });
440
441        producer.join().unwrap();
442        let consumer_popped = consumer.join().unwrap();
443
444        // Between the consumer and any remaining frames, we should account for
445        // all 1000 pushes.
446        let remaining = q.frame_count();
447        assert_eq!(consumer_popped + remaining, 1000);
448    }
449
450    #[test]
451    fn empty_drain() {
452        let q = Queue::new(1024);
453        let blob = q.drain_all();
454        assert!(blob.is_empty());
455    }
456
457    #[test]
458    fn drain_then_push() {
459        let q = Queue::new(1024);
460        q.push(b"first").unwrap();
461        let blob = q.drain_all();
462        assert!(!blob.is_empty());
463
464        // Queue is empty after drain; push more.
465        q.push(b"second").unwrap();
466        assert_eq!(q.frame_count(), 1);
467        assert_eq!(q.try_pop().unwrap(), b"second");
468    }
469}