Skip to main content

frozen_core/
mpscq.rs

1//! A low-latency implementation of MPSC (multi-producer single-consumer) queue
2//!
3//! ## Benchmarks
4//!
5//! Observed latency for push and drain operations,
6//!
7//! _NOTE:_ All measurements include end-to-end operation cost (incl. allocations & deallocation)
8//!
9//! ```md
10//! | Operation | Latency (average)  |
11//! |:----------|:-------------------|
12//! | Push      | ~36 nanoseconds    |
13//! | Drain     | ~37 nanoseconds    |
14//! ```
15//!
16//! Environment used for benching,
17//!
18//! * OS: NixOS (WSL2)
19//! * Architecture: x86_64
20//! * Memory: 8 GiB RAM (DDR4)
21//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
22//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
23//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
24//!
25//! ## Example
26//!
27//! ```
28//! use frozen_core::mpscq::MPSCQueue;
29//!
30//! let queue = MPSCQueue::<usize>::default();
31//!
32//! queue.push(0x10);
33//! queue.push(0x20);
34//!
35//! let batch: Vec<usize> = queue.drain();
36//! assert_eq!(batch.len(), 2);
37//! ```
38
39use core::{ptr, sync::atomic};
40
41/// A low-latency implementation of MPSC (multi-producer single-consumer) queue
42///
43/// ## Example
44///
45/// ```
46/// use frozen_core::mpscq::MPSCQueue;
47///
48/// let queue = MPSCQueue::<usize>::default();
49///
50/// queue.push(0x100);
51/// queue.push(0x200);
52///
53/// let batch: Vec<usize> = queue.drain();
54///
55/// assert_eq!(batch.len(), 2);
56/// assert_eq!(batch, vec![0x200, 0x100]);
57/// ```
58#[derive(Debug)]
59pub struct MPSCQueue<T> {
60    head: atomic::AtomicPtr<Node<T>>,
61}
62
63unsafe impl<T> Send for MPSCQueue<T> {}
64unsafe impl<T> Sync for MPSCQueue<T> {}
65
66impl<T> MPSCQueue<T> {
67    /// Checks if the [`MPSCQueue`] is currently empty
68    ///
69    /// ## Example
70    ///
71    /// ```
72    /// use frozen_core::mpscq::MPSCQueue;
73    ///
74    /// let queue = MPSCQueue::<usize>::default();
75    /// assert!(queue.is_empty());
76    /// ```
77    #[inline(always)]
78    pub fn is_empty(&self) -> bool {
79        self.head.load(atomic::Ordering::Acquire).is_null()
80    }
81
82    /// Push an entry into the [`MPSCQueue`]
83    ///
84    /// ## Example
85    ///
86    /// ```
87    /// use frozen_core::mpscq::MPSCQueue;
88    ///
89    /// let queue = MPSCQueue::<usize>::default();
90    /// queue.push(0x0A);
91    ///
92    /// let batch = queue.drain();
93    ///
94    /// assert_eq!(batch.len(), 1);
95    /// assert_eq!(batch, vec![0x0A]);
96    /// ```
97    #[inline(always)]
98    pub fn push(&self, value: T) {
99        let mut head = self.head.load(atomic::Ordering::Relaxed);
100        let node = Node::new(value);
101
102        loop {
103            unsafe { (*node).next = head };
104            match self.head.compare_exchange_weak(
105                head,
106                node,
107                atomic::Ordering::AcqRel,
108                atomic::Ordering::Relaxed,
109            ) {
110                Ok(_) => return,
111                Err(h) => head = h,
112            }
113        }
114    }
115
116    /// Drain the current batch of items from the [`MPSCQueue`]
117    ///
118    /// ## Ordering
119    ///
120    /// The queue internally uses a linked list like structure for storing `T`, therefore entries
121    /// are stored in LIFO order.
122    ///
123    /// ## Multiple Consumers
124    ///
125    /// By design [`MPSCQueue::drain`] is thread safe, but draining from multiple threads is
126    /// semantically incorrect. In the MPSC model, there should only be a single consumer, while
127    /// there can be many producers.
128    ///
129    /// ## Example
130    ///
131    /// ```
132    /// use frozen_core::mpscq::MPSCQueue;
133    ///
134    /// let queue = MPSCQueue::<u8>::default();
135    ///
136    /// queue.push(0x0A);
137    /// queue.push(0x0B);
138    /// queue.push(0x0C);
139    ///
140    /// let batch = queue.drain();
141    ///
142    /// assert_eq!(batch.len(), 3);
143    /// assert_eq!(batch, vec![0x0C, 0x0B, 0x0A]);
144    /// ```
145    #[inline(always)]
146    pub fn drain(&self) -> Vec<T> {
147        let mut out = Vec::new();
148        let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
149
150        while !node.is_null() {
151            let boxed = unsafe { Box::from_raw(node) };
152            node = boxed.next;
153            out.push(boxed.value);
154        }
155
156        out
157    }
158}
159
160impl<T> Default for MPSCQueue<T> {
161    fn default() -> Self {
162        Self { head: atomic::AtomicPtr::new(ptr::null_mut()) }
163    }
164}
165
166impl<T> Drop for MPSCQueue<T> {
167    fn drop(&mut self) {
168        let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::Relaxed);
169        while !node.is_null() {
170            unsafe {
171                let boxed = Box::from_raw(node);
172                node = boxed.next;
173            }
174        }
175    }
176}
177
178struct Node<T> {
179    next: *mut Node<T>,
180    value: T,
181}
182
183impl<T> Node<T> {
184    fn new(value: T) -> *mut Self {
185        Box::into_raw(Box::new(Self { next: ptr::null_mut(), value }))
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use std::sync::{Arc, Barrier};
193    use std::thread;
194
195    mod basics {
196        use super::*;
197
198        #[test]
199        fn ok_push_drain_single() {
200            let q = MPSCQueue::default();
201            q.push(1usize);
202
203            let batch = q.drain();
204            assert_eq!(batch, vec![1]);
205        }
206
207        #[test]
208        fn ok_push_drain_multiple() {
209            let q = MPSCQueue::default();
210
211            q.push(1);
212            q.push(2);
213            q.push(3);
214
215            let batch = q.drain();
216            assert_eq!(batch.len(), 3);
217            assert_eq!(batch, vec![3, 2, 1]);
218        }
219
220        #[test]
221        fn ok_drain_empty_when_queue_empty() {
222            let q: MPSCQueue<usize> = MPSCQueue::default();
223            let batch = q.drain();
224            assert!(batch.is_empty());
225        }
226    }
227
228    mod empty {
229        use super::*;
230
231        #[test]
232        fn ok_is_empty_true_on_init() {
233            let q: MPSCQueue<usize> = MPSCQueue::default();
234            assert!(q.is_empty());
235        }
236
237        #[test]
238        fn ok_is_empty_false_after_push() {
239            let q = MPSCQueue::default();
240            q.push(1);
241            assert!(!q.is_empty());
242        }
243
244        #[test]
245        fn ok_is_empty_true_after_drain() {
246            let q = MPSCQueue::default();
247
248            q.push(1);
249            q.push(2);
250
251            let _ = q.drain();
252            assert!(q.is_empty());
253        }
254    }
255
256    mod cycles {
257        use super::*;
258
259        #[test]
260        fn ok_single_push_drain_cycles() {
261            let q = MPSCQueue::default();
262            for i in 0..0x400 {
263                q.push(i);
264                let batch = q.drain();
265
266                assert_eq!(batch.len(), 1);
267                assert_eq!(batch[0], i);
268            }
269        }
270
271        #[test]
272        fn ok_multi_push_drain_cycles() {
273            let q = MPSCQueue::default();
274            for _ in 0..0x200 {
275                for i in 0..0x0A {
276                    q.push(i);
277                }
278
279                let batch = q.drain();
280                assert_eq!(batch.len(), 0x0A);
281            }
282        }
283    }
284
285    mod concurrency {
286        use super::*;
287
288        const THREADS: usize = 0x0A;
289        const ITERS: usize = 0x2000;
290
291        #[test]
292        fn ok_multi_tx_push() {
293            let q = Arc::new(MPSCQueue::default());
294
295            let mut handles = Vec::new();
296            for _ in 0..THREADS {
297                let q = q.clone();
298                handles.push(thread::spawn(move || {
299                    for i in 0..ITERS {
300                        q.push(i);
301                    }
302                }));
303            }
304
305            for h in handles {
306                h.join().unwrap();
307            }
308
309            let batch = q.drain();
310            assert_eq!(batch.len(), THREADS * ITERS);
311        }
312
313        #[test]
314        fn ok_multi_tx_push_high_contention() {
315            let q = Arc::new(MPSCQueue::default());
316            let barrier = Arc::new(Barrier::new(THREADS * 2));
317
318            let mut handles = Vec::new();
319
320            for _ in 0..(THREADS * 2) {
321                let q = q.clone();
322                let barrier = barrier.clone();
323
324                handles.push(thread::spawn(move || {
325                    barrier.wait();
326
327                    for i in 0..(ITERS * 2) {
328                        q.push(i);
329                    }
330                }));
331            }
332
333            for h in handles {
334                h.join().unwrap();
335            }
336
337            let batch = q.drain();
338            assert_eq!(batch.len(), (THREADS * 2) * (ITERS * 2));
339        }
340
341        #[test]
342        fn ok_multi_tx_push_drain() {
343            let q = Arc::new(MPSCQueue::default());
344            let producer = {
345                let q = q.clone();
346                thread::spawn(move || {
347                    for i in 0..0x8000 {
348                        q.push(i);
349                    }
350                })
351            };
352
353            let consumer = {
354                let q = q.clone();
355                thread::spawn(move || {
356                    let mut total = 0usize;
357                    while total < 0x8000 {
358                        let batch = q.drain();
359                        total += batch.len();
360                    }
361
362                    total
363                })
364            };
365
366            producer.join().unwrap();
367            let total = consumer.join().unwrap();
368
369            assert_eq!(total, 0x8000);
370        }
371    }
372}