dynomite/io/cbuf.rs
1//! Fixed-size SPSC ring buffer.
2//!
3//! The C engine uses the header-only `dyn_cbuf.h` macro family
4//! (`CBUF_Init`, `CBUF_Push`, `CBUF_Pop`, `CBUF_IsEmpty`,
5//! `CBUF_IsFull`) for the producer-single-consumer rings that connect
6//! the worker, gossip, and entropy threads. The macros require a
7//! power-of-two capacity smaller than the index type and a fixed
8//! element type chosen at compile time.
9//!
10//! The Rust port keeps the same shape with a generic [`CBuf<T>`] that
11//! wraps [`crossbeam_queue::ArrayQueue`]. `ArrayQueue` is a lock-free
12//! MPMC ring; the SPSC use case is the strictest restriction of MPMC,
13//! so the same container serves both. The crate is allocation-free on
14//! the hot path and is implemented entirely with safe Rust at the
15//! consumer boundary, so the workspace's `forbid(unsafe_code)` lint
16//! stays in effect.
17//!
18//! Capacity does not need to be a power of two; `ArrayQueue` accepts
19//! any non-zero `usize`. Callers porting the C macros should pass the
20//! same constant they used for `cbuf##_SIZE`.
21//!
22//! # Examples
23//!
24//! ```
25//! use dynomite::io::cbuf::CBuf;
26//!
27//! let q: CBuf<u32> = CBuf::new(4);
28//! q.push(1).unwrap();
29//! q.push(2).unwrap();
30//! assert_eq!(q.len(), 2);
31//! assert_eq!(q.pop(), Some(1));
32//! assert_eq!(q.pop(), Some(2));
33//! assert!(q.is_empty());
34//! ```
35
36use crossbeam_queue::ArrayQueue;
37
38/// Bounded SPSC ring queue.
39///
40/// `CBuf` mirrors the contract of the C `CBUF_*` macro family while
41/// remaining safe to use across multiple producers or consumers (the
42/// underlying `ArrayQueue` is MPMC). A new ring of capacity `N` admits
43/// up to `N` elements before [`push`](Self::push) starts returning
44/// `Err`.
45pub struct CBuf<T> {
46 inner: ArrayQueue<T>,
47}
48
49impl<T> CBuf<T> {
50 /// Create a ring with room for exactly `capacity` elements.
51 ///
52 /// # Panics
53 ///
54 /// Panics if `capacity == 0` (matching the precondition on
55 /// [`ArrayQueue::new`]).
56 ///
57 /// # Examples
58 ///
59 /// ```
60 /// use dynomite::io::cbuf::CBuf;
61 /// let q: CBuf<u8> = CBuf::new(8);
62 /// assert_eq!(q.capacity(), 8);
63 /// ```
64 pub fn new(capacity: usize) -> Self {
65 Self {
66 inner: ArrayQueue::new(capacity),
67 }
68 }
69
70 /// Capacity in elements. Mirrors `cbuf##_SIZE`.
71 pub fn capacity(&self) -> usize {
72 self.inner.capacity()
73 }
74
75 /// Append an element. Returns `Err(item)` when the ring is full,
76 /// echoing `try_send` on bounded channels. Mirrors `CBUF_Push`
77 /// (the C version overwrites silently on overflow; the Rust
78 /// version reports the failure so callers cannot lose data
79 /// inadvertently).
80 ///
81 /// # Examples
82 ///
83 /// ```
84 /// use dynomite::io::cbuf::CBuf;
85 /// let q: CBuf<u8> = CBuf::new(2);
86 /// q.push(1).unwrap();
87 /// q.push(2).unwrap();
88 /// assert_eq!(q.push(3), Err(3));
89 /// ```
90 pub fn push(&self, item: T) -> Result<(), T> {
91 self.inner.push(item)
92 }
93
94 /// Remove and return the head element. Mirrors `CBUF_Pop`.
95 ///
96 /// # Examples
97 ///
98 /// ```
99 /// use dynomite::io::cbuf::CBuf;
100 /// let q: CBuf<u8> = CBuf::new(2);
101 /// q.push(7).unwrap();
102 /// assert_eq!(q.pop(), Some(7));
103 /// assert_eq!(q.pop(), None);
104 /// ```
105 pub fn pop(&self) -> Option<T> {
106 self.inner.pop()
107 }
108
109 /// Number of elements currently buffered. Mirrors `CBUF_Len`.
110 pub fn len(&self) -> usize {
111 self.inner.len()
112 }
113
114 /// True when the ring contains no elements. Mirrors `CBUF_IsEmpty`.
115 pub fn is_empty(&self) -> bool {
116 self.inner.is_empty()
117 }
118
119 /// True when the ring is at capacity. Mirrors `CBUF_IsFull`.
120 pub fn is_full(&self) -> bool {
121 self.inner.is_full()
122 }
123}
124
125impl<T> std::fmt::Debug for CBuf<T> {
126 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
127 f.debug_struct("CBuf")
128 .field("capacity", &self.inner.capacity())
129 .field("len", &self.inner.len())
130 .finish()
131 }
132}
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137
138 #[test]
139 fn push_pop_fifo_ordering() {
140 let q: CBuf<u32> = CBuf::new(8);
141 for i in 0..8u32 {
142 q.push(i).unwrap();
143 }
144 assert!(q.is_full());
145 for i in 0..8u32 {
146 assert_eq!(q.pop(), Some(i));
147 }
148 assert!(q.is_empty());
149 }
150
151 #[test]
152 fn push_returns_item_on_full() {
153 let q: CBuf<u32> = CBuf::new(2);
154 q.push(1).unwrap();
155 q.push(2).unwrap();
156 assert_eq!(q.push(3), Err(3));
157 }
158
159 #[test]
160 fn pop_returns_none_on_empty() {
161 let q: CBuf<u32> = CBuf::new(2);
162 assert_eq!(q.pop(), None);
163 }
164
165 #[test]
166 fn capacity_reports_initial_size() {
167 let q: CBuf<u32> = CBuf::new(13);
168 assert_eq!(q.capacity(), 13);
169 }
170}