Skip to main content

bbqueue/
lib.rs

1//! # BBQueue
2//!
3//! BBQueue, short for "BipBuffer Queue", is a Single Producer Single Consumer,
4//! lockless, no_std, thread safe, queue, based on [BipBuffers]. For more info on
5//! the design of the lock-free algorithm used by bbqueue, see [this blog post].
6//!
7//! [BipBuffers]: https://www.codeproject.com/articles/The-Bip-Buffer-The-Circular-Buffer-with-a-Twist
8//! [this blog post]: https://ferrous-systems.com/blog/lock-free-ring-buffer/
9//!
10//! BBQueue is designed (primarily) to be a First-In, First-Out queue for use with DMA on embedded
11//! systems.
12//!
13//! While Circular/Ring Buffers allow you to send data between two threads (or from an interrupt to
14//! main code), you must push the data one piece at a time. With BBQueue, you instead are granted a
15//! block of contiguous memory, which can be filled (or emptied) by a DMA engine.
16//!
17//! ## Local usage
18//!
19//! ```rust
20//! // The "Churrasco" flavor has inline storage, hardware atomic
21//! // support, no async support, and is not reference counted.
22//! use bbqueue::nicknames::Churrasco;
23//!
24//! // Create a buffer with six elements
25//! let bb: Churrasco<6> = Churrasco::new();
26//! let prod = bb.stream_producer();
27//! let cons = bb.stream_consumer();
28//!
29//! // Request space for one byte
30//! let mut wgr = prod.grant_exact(1).unwrap();
31//!
32//! // Set the data
33//! wgr[0] = 123;
34//!
35//! assert_eq!(wgr.len(), 1);
36//!
37//! // Make the data ready for consuming
38//! wgr.commit(1);
39//!
40//! // Read all available bytes
41//! let rgr = cons.read().unwrap();
42//!
43//! assert_eq!(rgr[0], 123);
44//!
45//! // Release the space for later writes
46//! rgr.release(1);
47//! ```
48//!
49//! ## Static usage
50//!
51//! ```rust
52//! use bbqueue::nicknames::Churrasco;
53//! use std::{thread::{sleep, spawn}, time::Duration};
54//!
55//! // Create a buffer with six elements
56//! static BB: Churrasco<6> = Churrasco::new();
57//!
58//! fn receiver() {
59//!     let cons = BB.stream_consumer();
60//!     loop {
61//!         if let Ok(rgr) = cons.read() {
62//!             assert_eq!(rgr.len(), 1);
63//!             assert_eq!(rgr[0], 123);
64//!             rgr.release(1);
65//!             break;
66//!         }
67//!         // don't do this in real code, use Notify!
68//!         sleep(Duration::from_millis(10));
69//!     }
70//! }
71//!
72//! fn main() {
73//!     let prod = BB.stream_producer();
74//!
75//!     // spawn the consumer
76//!     let hdl = spawn(receiver);
77//!
78//!     // Request space for one byte
79//!     let mut wgr = prod.grant_exact(1).unwrap();
80//!
81//!     // Set the data
82//!     wgr[0] = 123;
83//!
84//!     assert_eq!(wgr.len(), 1);
85//!
86//!     // Make the data ready for consuming
87//!     wgr.commit(1);
88//!
89//!     // make sure the receiver terminated
90//!     hdl.join().unwrap();
91//! }
92//! ```
93//!
94//! ## Nicknames
95//!
96//! bbqueue uses generics to customize the data structure in four main ways:
97//!
98//! * Whether the byte storage is inline (and const-generic), or heap allocated
99//! * Whether the queue is polling-only, or supports async/await sending/receiving
100//! * Whether the queue uses a lock-free algorithm with CAS atomics, or uses a critical section
101//!   (for targets that don't have CAS atomics)
102//! * Whether the queue is reference counted, allowing Producer and Consumer halves to be passed
103//!   around without lifetimes.
104//!
105//! See the [`nicknames`](crate::nicknames) module for all sixteen variants.
106//!
107//! ## Stability
108//!
109//! `bbqueue` v0.6 is a breaking change from the older "classic" v0.5 interfaces. The intent is to
110//! have a few minor breaking changes in early 2026, and to get to v1.0 as quickly as possible.
111
112#![cfg_attr(not(any(test, feature = "std")), no_std)]
113#![deny(missing_docs)]
114#![deny(warnings)]
115
116#[cfg(feature = "alloc")]
117extern crate alloc;
118
119/// Type aliases for different generic configurations
120///
121pub mod nicknames;
122
123/// Producer and consumer interfaces
124///
125pub mod prod_cons;
126
127/// Queue storage
128///
129mod queue;
130pub use queue::BBQueue;
131#[cfg(feature = "alloc")]
132pub use queue::ArcBBQueue;
133
134/// Generic traits
135///
136pub mod traits;
137
138/// Re-export of external types/traits
139///
140pub mod export {
141    pub use const_init::ConstInit;
142}
143
144#[cfg(all(test, feature = "alloc"))]
145mod test {
146    use core::{ops::Deref, time::Duration};
147
148    use crate::{
149        queue::{ArcBBQueue, BBQueue},
150        traits::{
151            coordination::cas::AtomicCoord,
152            notifier::maitake::MaiNotSpsc,
153            storage::{BoxedSlice, Inline},
154        },
155    };
156
157    #[cfg(all(target_has_atomic = "ptr", feature = "alloc"))]
158    #[test]
159    fn ux() {
160        use crate::traits::{notifier::polling::Polling, storage::BoxedSlice};
161
162        static BBQ: BBQueue<Inline<64>, AtomicCoord, Polling> = BBQueue::new();
163        let _ = BBQ.stream_producer();
164        let _ = BBQ.stream_consumer();
165
166        let buf2 = Inline::<64>::new();
167        let bbq2: BBQueue<_, AtomicCoord, Polling> = BBQueue::new_with_storage(&buf2);
168        let _ = bbq2.stream_producer();
169        let _ = bbq2.stream_consumer();
170
171        let buf3 = BoxedSlice::new(64);
172        let bbq3: BBQueue<_, AtomicCoord, Polling> = BBQueue::new_with_storage(buf3);
173        let _ = bbq3.stream_producer();
174        let _ = bbq3.stream_consumer();
175    }
176
177    #[cfg(target_has_atomic = "ptr")]
178    #[test]
179    fn smoke() {
180        use crate::traits::notifier::polling::Polling;
181        use core::ops::Deref;
182
183        static BBQ: BBQueue<Inline<64>, AtomicCoord, Polling> = BBQueue::new();
184        let prod = BBQ.stream_producer();
185        let cons = BBQ.stream_consumer();
186
187        let write_once = &[0x01, 0x02, 0x03, 0x04, 0x11, 0x12, 0x13, 0x14];
188        let mut wgr = prod.grant_exact(8).unwrap();
189        wgr.copy_from_slice(write_once);
190        wgr.commit(8);
191
192        let rgr = cons.read().unwrap();
193        assert_eq!(rgr.deref(), write_once.as_slice(),);
194        rgr.release(4);
195
196        let rgr = cons.read().unwrap();
197        assert_eq!(rgr.deref(), &write_once[4..]);
198        rgr.release(4);
199
200        assert!(cons.read().is_err());
201    }
202
203    #[cfg(target_has_atomic = "ptr")]
204    #[test]
205    fn smoke_framed() {
206        use crate::traits::notifier::polling::Polling;
207        use core::ops::Deref;
208
209        static BBQ: BBQueue<Inline<64>, AtomicCoord, Polling> = BBQueue::new();
210        let prod = BBQ.framed_producer();
211        let cons = BBQ.framed_consumer();
212
213        let write_once = &[0x01, 0x02, 0x03, 0x04, 0x11, 0x12];
214        let mut wgr = prod.grant(8).unwrap();
215        wgr[..6].copy_from_slice(write_once);
216        wgr.commit(6);
217
218        let rgr = cons.read().unwrap();
219        assert_eq!(rgr.deref(), write_once.as_slice());
220        rgr.release();
221
222        assert!(cons.read().is_err());
223    }
224
225    #[cfg(target_has_atomic = "ptr")]
226    #[test]
227    fn framed_misuse() {
228        use crate::traits::notifier::polling::Polling;
229
230        static BBQ: BBQueue<Inline<64>, AtomicCoord, Polling> = BBQueue::new();
231        let prod = BBQ.stream_producer();
232        let cons = BBQ.framed_consumer();
233
234        // Bad grant one: HUGE header value
235        let write_once = &[0xFF, 0xFF, 0x03, 0x04, 0x11, 0x12];
236        let mut wgr = prod.grant_exact(6).unwrap();
237        wgr[..6].copy_from_slice(write_once);
238        wgr.commit(6);
239
240        assert!(cons.read().is_err());
241
242        {
243            // Clear the bad grant
244            let cons2 = BBQ.stream_consumer();
245            let rgr = cons2.read().unwrap();
246            rgr.release(6);
247        }
248
249        // Bad grant two: too small of a grant
250        let write_once = &[0x00];
251        let mut wgr = prod.grant_exact(1).unwrap();
252        wgr[..1].copy_from_slice(write_once);
253        wgr.commit(1);
254
255        assert!(cons.read().is_err());
256    }
257
258    #[tokio::test]
259    async fn asink() {
260        static BBQ: BBQueue<Inline<64>, AtomicCoord, MaiNotSpsc> = BBQueue::new();
261        let prod = BBQ.stream_producer();
262        let cons = BBQ.stream_consumer();
263
264        let rxfut = tokio::task::spawn(async move {
265            let rgr = cons.wait_read().await;
266            assert_eq!(rgr.deref(), &[1, 2, 3]);
267        });
268
269        let txfut = tokio::task::spawn(async move {
270            tokio::time::sleep(Duration::from_millis(500)).await;
271            let mut wgr = prod.grant_exact(3).unwrap();
272            wgr.copy_from_slice(&[1, 2, 3]);
273            wgr.commit(3);
274        });
275
276        // todo: timeouts
277        rxfut.await.unwrap();
278        txfut.await.unwrap();
279    }
280
281    #[tokio::test]
282    async fn asink_framed() {
283        static BBQ: BBQueue<Inline<64>, AtomicCoord, MaiNotSpsc> = BBQueue::new();
284        let prod = BBQ.framed_producer();
285        let cons = BBQ.framed_consumer();
286
287        let rxfut = tokio::task::spawn(async move {
288            let rgr = cons.wait_read().await;
289            assert_eq!(rgr.deref(), &[1, 2, 3]);
290        });
291
292        let txfut = tokio::task::spawn(async move {
293            tokio::time::sleep(Duration::from_millis(500)).await;
294            let mut wgr = prod.grant(3).unwrap();
295            wgr.copy_from_slice(&[1, 2, 3]);
296            wgr.commit(3);
297        });
298
299        // todo: timeouts
300        rxfut.await.unwrap();
301        txfut.await.unwrap();
302    }
303
304    #[tokio::test]
305    async fn arc1() {
306        let bbq: ArcBBQueue<Inline<64>, AtomicCoord, MaiNotSpsc> =
307            ArcBBQueue::new_with_storage(Inline::new());
308        let prod = bbq.stream_producer();
309        let cons = bbq.stream_consumer();
310
311        let rxfut = tokio::task::spawn(async move {
312            let rgr = cons.wait_read().await;
313            assert_eq!(rgr.deref(), &[1, 2, 3]);
314        });
315
316        let txfut = tokio::task::spawn(async move {
317            tokio::time::sleep(Duration::from_millis(500)).await;
318            let mut wgr = prod.grant_exact(3).unwrap();
319            wgr.copy_from_slice(&[1, 2, 3]);
320            wgr.commit(3);
321        });
322
323        // todo: timeouts
324        rxfut.await.unwrap();
325        txfut.await.unwrap();
326    }
327
328    #[tokio::test]
329    async fn arc2() {
330        let bbq: ArcBBQueue<BoxedSlice, AtomicCoord, MaiNotSpsc> =
331            ArcBBQueue::new_with_storage(BoxedSlice::new(64));
332        let prod = bbq.stream_producer();
333        let cons = bbq.stream_consumer();
334
335        let rxfut = tokio::task::spawn(async move {
336            let rgr = cons.wait_read().await;
337            assert_eq!(rgr.deref(), &[1, 2, 3]);
338        });
339
340        let txfut = tokio::task::spawn(async move {
341            tokio::time::sleep(Duration::from_millis(500)).await;
342            let mut wgr = prod.grant_exact(3).unwrap();
343            wgr.copy_from_slice(&[1, 2, 3]);
344            wgr.commit(3);
345        });
346
347        // todo: timeouts
348        rxfut.await.unwrap();
349        txfut.await.unwrap();
350
351        drop(bbq);
352    }
353}