Skip to main content

dynomite/io/
cbuf.rs

1//! Fixed-size SPSC ring buffer.
2//!
3//! The C engine uses the header-only `dyn_cbuf.h` macro family
4//! (`CBUF_Init`, `CBUF_Push`, `CBUF_Pop`, `CBUF_IsEmpty`,
5//! `CBUF_IsFull`) for the producer-single-consumer rings that connect
6//! the worker, gossip, and entropy threads. The macros require a
7//! power-of-two capacity smaller than the index type and a fixed
8//! element type chosen at compile time.
9//!
10//! The Rust port keeps the same shape with a generic [`CBuf<T>`] that
11//! wraps [`crossbeam_queue::ArrayQueue`]. `ArrayQueue` is a lock-free
12//! MPMC ring; the SPSC use case is the strictest restriction of MPMC,
13//! so the same container serves both. The crate is allocation-free on
14//! the hot path and is implemented entirely with safe Rust at the
15//! consumer boundary, so the workspace's `forbid(unsafe_code)` lint
16//! stays in effect.
17//!
18//! Capacity does not need to be a power of two; `ArrayQueue` accepts
19//! any non-zero `usize`. Callers porting the C macros should pass the
20//! same constant they used for `cbuf##_SIZE`.
21//!
22//! # Examples
23//!
24//! ```
25//! use dynomite::io::cbuf::CBuf;
26//!
27//! let q: CBuf<u32> = CBuf::new(4);
28//! q.push(1).unwrap();
29//! q.push(2).unwrap();
30//! assert_eq!(q.len(), 2);
31//! assert_eq!(q.pop(), Some(1));
32//! assert_eq!(q.pop(), Some(2));
33//! assert!(q.is_empty());
34//! ```
35
36use crossbeam_queue::ArrayQueue;
37
38/// Bounded SPSC ring queue.
39///
40/// `CBuf` mirrors the contract of the C `CBUF_*` macro family while
41/// remaining safe to use across multiple producers or consumers (the
42/// underlying `ArrayQueue` is MPMC). A new ring of capacity `N` admits
43/// up to `N` elements before [`push`](Self::push) starts returning
44/// `Err`.
45pub struct CBuf<T> {
46    inner: ArrayQueue<T>,
47}
48
49impl<T> CBuf<T> {
50    /// Create a ring with room for exactly `capacity` elements.
51    ///
52    /// # Panics
53    ///
54    /// Panics if `capacity == 0` (matching the precondition on
55    /// [`ArrayQueue::new`]).
56    ///
57    /// # Examples
58    ///
59    /// ```
60    /// use dynomite::io::cbuf::CBuf;
61    /// let q: CBuf<u8> = CBuf::new(8);
62    /// assert_eq!(q.capacity(), 8);
63    /// ```
64    pub fn new(capacity: usize) -> Self {
65        Self {
66            inner: ArrayQueue::new(capacity),
67        }
68    }
69
70    /// Capacity in elements. Mirrors `cbuf##_SIZE`.
71    pub fn capacity(&self) -> usize {
72        self.inner.capacity()
73    }
74
75    /// Append an element. Returns `Err(item)` when the ring is full,
76    /// echoing `try_send` on bounded channels. Mirrors `CBUF_Push`
77    /// (the C version overwrites silently on overflow; the Rust
78    /// version reports the failure so callers cannot lose data
79    /// inadvertently).
80    ///
81    /// # Examples
82    ///
83    /// ```
84    /// use dynomite::io::cbuf::CBuf;
85    /// let q: CBuf<u8> = CBuf::new(2);
86    /// q.push(1).unwrap();
87    /// q.push(2).unwrap();
88    /// assert_eq!(q.push(3), Err(3));
89    /// ```
90    pub fn push(&self, item: T) -> Result<(), T> {
91        self.inner.push(item)
92    }
93
94    /// Remove and return the head element. Mirrors `CBUF_Pop`.
95    ///
96    /// # Examples
97    ///
98    /// ```
99    /// use dynomite::io::cbuf::CBuf;
100    /// let q: CBuf<u8> = CBuf::new(2);
101    /// q.push(7).unwrap();
102    /// assert_eq!(q.pop(), Some(7));
103    /// assert_eq!(q.pop(), None);
104    /// ```
105    pub fn pop(&self) -> Option<T> {
106        self.inner.pop()
107    }
108
109    /// Number of elements currently buffered. Mirrors `CBUF_Len`.
110    pub fn len(&self) -> usize {
111        self.inner.len()
112    }
113
114    /// True when the ring contains no elements. Mirrors `CBUF_IsEmpty`.
115    pub fn is_empty(&self) -> bool {
116        self.inner.is_empty()
117    }
118
119    /// True when the ring is at capacity. Mirrors `CBUF_IsFull`.
120    pub fn is_full(&self) -> bool {
121        self.inner.is_full()
122    }
123}
124
125impl<T> std::fmt::Debug for CBuf<T> {
126    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127        f.debug_struct("CBuf")
128            .field("capacity", &self.inner.capacity())
129            .field("len", &self.inner.len())
130            .finish()
131    }
132}
133
134#[cfg(test)]
135mod tests {
136    use super::*;
137
138    #[test]
139    fn push_pop_fifo_ordering() {
140        let q: CBuf<u32> = CBuf::new(8);
141        for i in 0..8u32 {
142            q.push(i).unwrap();
143        }
144        assert!(q.is_full());
145        for i in 0..8u32 {
146            assert_eq!(q.pop(), Some(i));
147        }
148        assert!(q.is_empty());
149    }
150
151    #[test]
152    fn push_returns_item_on_full() {
153        let q: CBuf<u32> = CBuf::new(2);
154        q.push(1).unwrap();
155        q.push(2).unwrap();
156        assert_eq!(q.push(3), Err(3));
157    }
158
159    #[test]
160    fn pop_returns_none_on_empty() {
161        let q: CBuf<u32> = CBuf::new(2);
162        assert_eq!(q.pop(), None);
163    }
164
165    #[test]
166    fn capacity_reports_initial_size() {
167        let q: CBuf<u32> = CBuf::new(13);
168        assert_eq!(q.capacity(), 13);
169    }
170}