Skip to main content

conduit_core/
queue.rs

1//! Bounded frame queue with guaranteed delivery.
2//!
3//! [`Queue`] is the counterpart to [`crate::RingBuffer`]: instead of silently
4//! dropping the oldest frames when the byte budget is exceeded, it rejects new
5//! pushes with [`crate::Error::ChannelFull`]. This makes it suitable for
6//! channels where every frame must be delivered (e.g. control messages,
7//! transaction logs).
8//!
9//! # Wire format (`drain_all`)
10//!
11//! The wire format is identical to [`crate::RingBuffer::drain_all`]:
12//!
13//! ```text
14//! [u32 LE frame_count]
15//! [u32 LE len_1][bytes_1]
16//! [u32 LE len_2][bytes_2]
17//! ...
18//! ```
19
20use std::collections::VecDeque;
21use std::sync::Mutex;
22
23use crate::Error;
24use crate::codec::DRAIN_FRAME_OVERHEAD;
25
26// ---------------------------------------------------------------------------
27// Inner
28// ---------------------------------------------------------------------------
29
30/// The unsynchronized interior of the queue.
31struct QueueInner {
32    /// Buffered frames in FIFO order.
33    frames: VecDeque<Vec<u8>>,
34    /// Total bytes used: sum of (DRAIN_FRAME_OVERHEAD + frame.len()) for each frame.
35    bytes_used: usize,
36    /// Maximum byte budget. `0` means unbounded.
37    max_bytes: usize,
38}
39
40impl QueueInner {
41    /// Create an empty inner buffer with the given byte budget.
42    fn new(max_bytes: usize) -> Self {
43        Self {
44            frames: VecDeque::new(),
45            bytes_used: 0,
46            max_bytes,
47        }
48    }
49
50    /// Cost of storing a single frame (length prefix + payload).
51    #[inline]
52    fn frame_cost(frame: &[u8]) -> usize {
53        DRAIN_FRAME_OVERHEAD + frame.len()
54    }
55}
56
57// ---------------------------------------------------------------------------
58// Queue
59// ---------------------------------------------------------------------------
60
61/// Thread-safe, bounded FIFO queue with backpressure.
62///
63/// Unlike [`crate::RingBuffer`], this queue never drops frames. When the byte
64/// budget would be exceeded, [`push`](Self::push) returns
65/// [`Error::ChannelFull`](crate::Error::ChannelFull) and the frame is rejected.
66/// A `max_bytes` of `0` means the queue is unbounded.
67///
68/// # Thread safety
69///
70/// All public methods take `&self` and synchronize via an internal [`Mutex`].
71pub struct Queue {
72    inner: Mutex<QueueInner>,
73}
74
75impl Queue {
76    /// Create a queue with the given byte limit.
77    ///
78    /// A `max_bytes` of `0` means unbounded — pushes will never fail due to
79    /// capacity.
80    ///
81    /// # Warning
82    ///
83    /// When `max_bytes` is `0` the queue has **no memory limit**. If the
84    /// producer pushes data faster than the consumer drains it, memory usage
85    /// will grow without bound, eventually causing an out-of-memory (OOM)
86    /// condition. Prefer a non-zero byte limit for production use and reserve
87    /// `0` (or [`Queue::unbounded`]) for cases where unbounded growth is
88    /// explicitly acceptable.
89    pub fn new(max_bytes: usize) -> Self {
90        Self {
91            inner: Mutex::new(QueueInner::new(max_bytes)),
92        }
93    }
94
95    /// Create an unbounded queue.
96    ///
97    /// Equivalent to `Queue::new(0)`.
98    ///
99    /// # Warning
100    ///
101    /// An unbounded queue has no memory limit. If the consumer cannot keep up
102    /// with the producer, memory usage will grow without bound. Prefer
103    /// [`Queue::new`] with a reasonable byte limit for production use.
104    pub fn unbounded() -> Self {
105        Self::new(0)
106    }
107
108    /// Push a frame into the queue.
109    ///
110    /// Returns `Ok(())` if the frame was accepted. Returns
111    /// [`Err(Error::ChannelFull)`](crate::Error::ChannelFull) if the frame
112    /// (plus its 4-byte length prefix) would exceed `max_bytes`. When
113    /// `max_bytes` is `0` (unbounded), pushes always succeed.
114    pub fn push(&self, frame: &[u8]) -> Result<(), Error> {
115        let cost = QueueInner::frame_cost(frame);
116        let mut inner = crate::lock_or_recover(&self.inner);
117
118        if inner.max_bytes > 0 && inner.bytes_used + cost > inner.max_bytes {
119            return Err(Error::ChannelFull);
120        }
121
122        inner.frames.push_back(frame.to_vec());
123        inner.bytes_used = inner.bytes_used.saturating_add(cost);
124        Ok(())
125    }
126
127    /// Read one frame from the front of the queue (FIFO).
128    ///
129    /// Returns `None` if the queue is empty.
130    #[must_use]
131    pub fn try_pop(&self) -> Option<Vec<u8>> {
132        let mut inner = crate::lock_or_recover(&self.inner);
133        let frame = inner.frames.pop_front()?;
134        inner.bytes_used -= QueueInner::frame_cost(&frame);
135        Some(frame)
136    }
137
138    /// Drain all queued frames into a single binary blob and clear the queue.
139    ///
140    /// # Wire format
141    ///
142    /// ```text
143    /// [u32 LE frame_count]
144    /// [u32 LE len_1][bytes_1]
145    /// [u32 LE len_2][bytes_2]
146    /// ...
147    /// ```
148    ///
149    /// Returns an empty `Vec` if the queue is empty.
150    #[must_use]
151    pub fn drain_all(&self) -> Vec<u8> {
152        // Swap the frames out under the lock, then serialize without contention.
153        let (mut frames, bytes_used) = {
154            let mut inner = crate::lock_or_recover(&self.inner);
155            if inner.frames.is_empty() {
156                return Vec::new();
157            }
158            let frames = std::mem::take(&mut inner.frames);
159            let bytes_used = inner.bytes_used;
160            inner.bytes_used = 0;
161            (frames, bytes_used)
162        };
163        // Lock released — serialize without holding the mutex.
164        let output_size = 4usize.saturating_add(bytes_used);
165        let mut buf = Vec::with_capacity(output_size);
166
167        // Frame count header.
168        let count = frames.len() as u32;
169        buf.extend_from_slice(&count.to_le_bytes());
170
171        // Each frame: [u32 LE len][bytes].
172        for frame in frames.make_contiguous() {
173            let len = frame.len() as u32;
174            buf.extend_from_slice(&len.to_le_bytes());
175            buf.extend_from_slice(frame);
176        }
177
178        buf
179    }
180
181    /// Number of frames currently queued.
182    #[must_use]
183    pub fn frame_count(&self) -> usize {
184        crate::lock_or_recover(&self.inner).frames.len()
185    }
186
187    /// Number of bytes currently used (including per-frame length prefixes).
188    #[must_use]
189    pub fn bytes_used(&self) -> usize {
190        crate::lock_or_recover(&self.inner).bytes_used
191    }
192
193    /// Maximum byte budget (`0` means unbounded).
194    #[must_use]
195    pub fn max_bytes(&self) -> usize {
196        crate::lock_or_recover(&self.inner).max_bytes
197    }
198
199    /// Clear all queued frames.
200    pub fn clear(&self) {
201        let mut inner = crate::lock_or_recover(&self.inner);
202        inner.frames.clear();
203        inner.bytes_used = 0;
204    }
205}
206
207impl std::fmt::Debug for Queue {
208    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
209        let inner = crate::lock_or_recover(&self.inner);
210        f.debug_struct("Queue")
211            .field("frame_count", &inner.frames.len())
212            .field("bytes_used", &inner.bytes_used)
213            .field("max_bytes", &inner.max_bytes)
214            .finish()
215    }
216}
217
218// ---------------------------------------------------------------------------
219// Tests
220// ---------------------------------------------------------------------------
221
222#[cfg(test)]
223mod tests {
224    use super::*;
225
226    #[test]
227    fn push_and_pop() {
228        let q = Queue::new(1024);
229        q.push(b"alpha").unwrap();
230        q.push(b"beta").unwrap();
231        q.push(b"gamma").unwrap();
232
233        assert_eq!(q.frame_count(), 3);
234        assert_eq!(q.try_pop().unwrap(), b"alpha");
235        assert_eq!(q.try_pop().unwrap(), b"beta");
236        assert_eq!(q.try_pop().unwrap(), b"gamma");
237        assert!(q.try_pop().is_none());
238    }
239
240    #[test]
241    fn push_within_limit() {
242        // Frame cost = 4 (overhead) + 4 (payload) = 8 bytes.
243        // Two frames = 16 bytes, capacity = 16.
244        let q = Queue::new(16);
245        q.push(b"aaaa").unwrap(); // cost 8, total 8
246        q.push(b"bbbb").unwrap(); // cost 8, total 16
247        assert_eq!(q.frame_count(), 2);
248        assert_eq!(q.bytes_used(), 16);
249    }
250
251    #[test]
252    fn push_exceeds_limit() {
253        // Capacity for exactly 2 frames of 4 bytes.
254        let q = Queue::new(16);
255        q.push(b"aaaa").unwrap(); // cost 8
256        q.push(b"bbbb").unwrap(); // cost 8, total 16
257
258        // Third push should fail.
259        let err = q.push(b"cccc").unwrap_err();
260        assert!(matches!(err, Error::ChannelFull));
261        assert_eq!(err.to_string(), "channel full: byte limit reached");
262
263        // Original frames are intact.
264        assert_eq!(q.frame_count(), 2);
265        assert_eq!(q.try_pop().unwrap(), b"aaaa");
266        assert_eq!(q.try_pop().unwrap(), b"bbbb");
267    }
268
269    #[test]
270    fn drain_all_format() {
271        let q = Queue::new(1024);
272        q.push(b"hello").unwrap();
273        q.push(b"world").unwrap();
274
275        let blob = q.drain_all();
276
277        // Parse: [u32 count][u32 len][bytes]...
278        let count = u32::from_le_bytes(blob[0..4].try_into().unwrap());
279        assert_eq!(count, 2);
280
281        let len1 = u32::from_le_bytes(blob[4..8].try_into().unwrap()) as usize;
282        assert_eq!(len1, 5);
283        assert_eq!(&blob[8..8 + len1], b"hello");
284
285        let offset2 = 8 + len1;
286        let len2 = u32::from_le_bytes(blob[offset2..offset2 + 4].try_into().unwrap()) as usize;
287        assert_eq!(len2, 5);
288        assert_eq!(&blob[offset2 + 4..offset2 + 4 + len2], b"world");
289
290        // Queue should be empty now.
291        assert_eq!(q.frame_count(), 0);
292        assert_eq!(q.bytes_used(), 0);
293    }
294
295    #[test]
296    fn drain_frees_capacity() {
297        let q = Queue::new(16);
298        q.push(b"aaaa").unwrap(); // cost 8
299        q.push(b"bbbb").unwrap(); // cost 8, total 16
300
301        // Queue is full.
302        assert!(q.push(b"cccc").is_err());
303
304        // Drain frees everything.
305        let blob = q.drain_all();
306        assert!(!blob.is_empty());
307        assert_eq!(q.bytes_used(), 0);
308
309        // Now pushes succeed again.
310        q.push(b"dddd").unwrap();
311        q.push(b"eeee").unwrap();
312        assert_eq!(q.frame_count(), 2);
313    }
314
315    #[test]
316    fn unbounded_mode() {
317        let q = Queue::unbounded();
318        assert_eq!(q.max_bytes(), 0);
319
320        // Push a large number of frames — should never fail.
321        for i in 0u32..10_000 {
322            q.push(&i.to_le_bytes()).unwrap();
323        }
324        assert_eq!(q.frame_count(), 10_000);
325    }
326
327    #[test]
328    fn frame_count_and_bytes() {
329        let q = Queue::new(1024);
330
331        assert_eq!(q.frame_count(), 0);
332        assert_eq!(q.bytes_used(), 0);
333        assert_eq!(q.max_bytes(), 1024);
334
335        q.push(b"abc").unwrap(); // cost = 4 + 3 = 7
336        assert_eq!(q.frame_count(), 1);
337        assert_eq!(q.bytes_used(), 7);
338
339        q.push(b"de").unwrap(); // cost = 4 + 2 = 6
340        assert_eq!(q.frame_count(), 2);
341        assert_eq!(q.bytes_used(), 13);
342
343        let _ = q.try_pop();
344        assert_eq!(q.frame_count(), 1);
345        assert_eq!(q.bytes_used(), 6);
346    }
347
348    #[test]
349    fn clear() {
350        let q = Queue::new(1024);
351        q.push(b"one").unwrap();
352        q.push(b"two").unwrap();
353        q.push(b"three").unwrap();
354
355        assert_eq!(q.frame_count(), 3);
356        q.clear();
357        assert_eq!(q.frame_count(), 0);
358        assert_eq!(q.bytes_used(), 0);
359        assert!(q.try_pop().is_none());
360    }
361
362    #[test]
363    fn concurrent_push_pop() {
364        use std::sync::Arc;
365
366        let q = Arc::new(Queue::unbounded());
367        let q_producer = Arc::clone(&q);
368        let q_consumer = Arc::clone(&q);
369
370        let producer = std::thread::spawn(move || {
371            for i in 0u32..1000 {
372                q_producer.push(&i.to_le_bytes()).unwrap();
373            }
374        });
375
376        let consumer = std::thread::spawn(move || {
377            let mut popped = 0usize;
378            loop {
379                if q_consumer.try_pop().is_some() {
380                    popped += 1;
381                }
382                if popped >= 1000 {
383                    break;
384                }
385                // Yield to let the producer make progress.
386                std::thread::yield_now();
387            }
388            popped
389        });
390
391        producer.join().unwrap();
392        let consumer_popped = consumer.join().unwrap();
393
394        // Between the consumer and any remaining frames, we should account for
395        // all 1000 pushes.
396        let remaining = q.frame_count();
397        assert_eq!(consumer_popped + remaining, 1000);
398    }
399
400    #[test]
401    fn empty_drain() {
402        let q = Queue::new(1024);
403        let blob = q.drain_all();
404        assert!(blob.is_empty());
405    }
406
407    #[test]
408    fn drain_then_push() {
409        let q = Queue::new(1024);
410        q.push(b"first").unwrap();
411        let blob = q.drain_all();
412        assert!(!blob.is_empty());
413
414        // Queue is empty after drain; push more.
415        q.push(b"second").unwrap();
416        assert_eq!(q.frame_count(), 1);
417        assert_eq!(q.try_pop().unwrap(), b"second");
418    }
419}