Skip to main content

frozen_core/
mpscq.rs

1//! A low-latency implementation of MPSC (multi-producer single-consumer) queue
2//!
3//! ## Benchmarks
4//!
5//! Observed latency for push and drain operations,
6//!
7//! _NOTE:_ All measurements include end-to-end operation cost (incl. allocations & deallocation)
8//!
9//! ```md
10//! | Operation | Latency (average)  |
11//! |:----------|:-------------------|
12//! | Push      | ~36 nanoseconds    |
13//! | Drain     | ~37 nanoseconds    |
14//! ```
15//!
16//! Environment used for benching,
17//!
18//! * OS: NixOS (WSL2)
19//! * Architecture: x86_64
20//! * Memory: 8 GiB RAM (DDR4)
21//! * Rust: rustc 1.86.0 w/ cargo 1.86.0
22//! * Kernel: Linux 6.6.87.2-microsoft-standard-WSL2
23//! * CPU: Intel® Core™ i5-10300H @ 2.50GHz (4C / 8T)
24//!
25//! ## Example
26//!
27//! ```
28//! use frozen_core::mpscq::MPSCQueue;
29//!
30//! let queue = MPSCQueue::<usize>::default();
31//!
32//! queue.push(0x10);
33//! queue.push(0x20);
34//!
35//! let batch: Vec<usize> = queue.drain();
36//! assert_eq!(batch.len(), 2);
37//! ```
38
39use core::{ptr, sync::atomic};
40
41/// A low-latency implementation of MPSC (multi-producer single-consumer) queue
42///
43/// ## Example
44///
45/// ```
46/// use frozen_core::mpscq::MPSCQueue;
47///
48/// let queue = MPSCQueue::<usize>::default();
49///
50/// queue.push(0x100);
51/// queue.push(0x200);
52///
53/// let batch: Vec<usize> = queue.drain();
54///
55/// assert_eq!(batch.len(), 2);
56/// assert_eq!(batch, vec![0x200, 0x100]);
57/// ```
58#[derive(Debug)]
59pub struct MPSCQueue<T> {
60    head: atomic::AtomicPtr<Node<T>>,
61}
62
63unsafe impl<T> Send for MPSCQueue<T> {}
64unsafe impl<T> Sync for MPSCQueue<T> {}
65
66impl<T> MPSCQueue<T> {
67    /// Checks if the [`MPSCQueue`] is currently empty
68    ///
69    /// ## Example
70    ///
71    /// ```
72    /// use frozen_core::mpscq::MPSCQueue;
73    ///
74    /// let queue = MPSCQueue::<usize>::default();
75    /// assert!(queue.is_empty());
76    /// ```
77    #[inline(always)]
78    pub fn is_empty(&self) -> bool {
79        self.head.load(atomic::Ordering::Acquire).is_null()
80    }
81
82    /// Push an entry into the [`MPSCQueue`]
83    ///
84    /// ## Example
85    ///
86    /// ```
87    /// use frozen_core::mpscq::MPSCQueue;
88    ///
89    /// let queue = MPSCQueue::<usize>::default();
90    /// queue.push(0x0A);
91    ///
92    /// let batch = queue.drain();
93    ///
94    /// assert_eq!(batch.len(), 1);
95    /// assert_eq!(batch, vec![0x0A]);
96    /// ```
97    #[inline(always)]
98    pub fn push(&self, value: T) {
99        let mut head = self.head.load(atomic::Ordering::Relaxed);
100        let node = Node::new(value);
101
102        loop {
103            unsafe { (*node).next = head };
104            match self.head.compare_exchange_weak(head, node, atomic::Ordering::AcqRel, atomic::Ordering::Relaxed) {
105                Ok(_) => return,
106                Err(h) => head = h,
107            }
108        }
109    }
110
111    /// Drain the current batch of items from the [`MPSCQueue`]
112    ///
113    /// ## Ordering
114    ///
115    /// The queue internally uses a linked list like structure for storing `T`, therefore entries
116    /// are stored in LIFO order.
117    ///
118    /// ## Multiple Consumers
119    ///
120    /// By design [`MPSCQueue::drain`] is thread safe, but draining from multiple threads is
121    /// semantically incorrect. In the MPSC model, there should only be a single consumer, while
122    /// there can be many producers.
123    ///
124    /// ## Example
125    ///
126    /// ```
127    /// use frozen_core::mpscq::MPSCQueue;
128    ///
129    /// let queue = MPSCQueue::<u8>::default();
130    ///
131    /// queue.push(0x0A);
132    /// queue.push(0x0B);
133    /// queue.push(0x0C);
134    ///
135    /// let batch = queue.drain();
136    ///
137    /// assert_eq!(batch.len(), 3);
138    /// assert_eq!(batch, vec![0x0C, 0x0B, 0x0A]);
139    /// ```
140    #[inline(always)]
141    pub fn drain(&self) -> Vec<T> {
142        let mut out = Vec::new();
143        let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
144
145        while !node.is_null() {
146            let boxed = unsafe { Box::from_raw(node) };
147            node = boxed.next;
148            out.push(boxed.value);
149        }
150
151        out
152    }
153}
154
155impl<T> Default for MPSCQueue<T> {
156    fn default() -> Self {
157        Self { head: atomic::AtomicPtr::new(ptr::null_mut()) }
158    }
159}
160
161impl<T> Drop for MPSCQueue<T> {
162    fn drop(&mut self) {
163        let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::Relaxed);
164        while !node.is_null() {
165            unsafe {
166                let boxed = Box::from_raw(node);
167                node = boxed.next;
168            }
169        }
170    }
171}
172
173struct Node<T> {
174    next: *mut Node<T>,
175    value: T,
176}
177
178impl<T> Node<T> {
179    fn new(value: T) -> *mut Self {
180        Box::into_raw(Box::new(Self { next: ptr::null_mut(), value }))
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187    use std::sync::{Arc, Barrier};
188    use std::thread;
189
190    mod basics {
191        use super::*;
192
193        #[test]
194        fn ok_push_drain_single() {
195            let q = MPSCQueue::default();
196            q.push(1usize);
197
198            let batch = q.drain();
199            assert_eq!(batch, vec![1]);
200        }
201
202        #[test]
203        fn ok_push_drain_multiple() {
204            let q = MPSCQueue::default();
205
206            q.push(1);
207            q.push(2);
208            q.push(3);
209
210            let batch = q.drain();
211            assert_eq!(batch.len(), 3);
212            assert_eq!(batch, vec![3, 2, 1]);
213        }
214
215        #[test]
216        fn ok_drain_empty_when_queue_empty() {
217            let q: MPSCQueue<usize> = MPSCQueue::default();
218            let batch = q.drain();
219            assert!(batch.is_empty());
220        }
221    }
222
223    mod empty {
224        use super::*;
225
226        #[test]
227        fn ok_is_empty_true_on_init() {
228            let q: MPSCQueue<usize> = MPSCQueue::default();
229            assert!(q.is_empty());
230        }
231
232        #[test]
233        fn ok_is_empty_false_after_push() {
234            let q = MPSCQueue::default();
235            q.push(1);
236            assert!(!q.is_empty());
237        }
238
239        #[test]
240        fn ok_is_empty_true_after_drain() {
241            let q = MPSCQueue::default();
242
243            q.push(1);
244            q.push(2);
245
246            let _ = q.drain();
247            assert!(q.is_empty());
248        }
249    }
250
251    mod cycles {
252        use super::*;
253
254        #[test]
255        fn ok_single_push_drain_cycles() {
256            let q = MPSCQueue::default();
257            for i in 0..0x400 {
258                q.push(i);
259                let batch = q.drain();
260
261                assert_eq!(batch.len(), 1);
262                assert_eq!(batch[0], i);
263            }
264        }
265
266        #[test]
267        fn ok_multi_push_drain_cycles() {
268            let q = MPSCQueue::default();
269            for _ in 0..0x200 {
270                for i in 0..0x0A {
271                    q.push(i);
272                }
273
274                let batch = q.drain();
275                assert_eq!(batch.len(), 0x0A);
276            }
277        }
278    }
279
280    mod concurrency {
281        use super::*;
282
283        const THREADS: usize = 0x0A;
284        const ITERS: usize = 0x2000;
285
286        #[test]
287        fn ok_multi_tx_push() {
288            let q = Arc::new(MPSCQueue::default());
289
290            let mut handles = Vec::new();
291            for _ in 0..THREADS {
292                let q = q.clone();
293                handles.push(thread::spawn(move || {
294                    for i in 0..ITERS {
295                        q.push(i);
296                    }
297                }));
298            }
299
300            for h in handles {
301                h.join().unwrap();
302            }
303
304            let batch = q.drain();
305            assert_eq!(batch.len(), THREADS * ITERS);
306        }
307
308        #[test]
309        fn ok_multi_tx_push_high_contention() {
310            let q = Arc::new(MPSCQueue::default());
311            let barrier = Arc::new(Barrier::new(THREADS * 2));
312
313            let mut handles = Vec::new();
314
315            for _ in 0..(THREADS * 2) {
316                let q = q.clone();
317                let barrier = barrier.clone();
318
319                handles.push(thread::spawn(move || {
320                    barrier.wait();
321
322                    for i in 0..(ITERS * 2) {
323                        q.push(i);
324                    }
325                }));
326            }
327
328            for h in handles {
329                h.join().unwrap();
330            }
331
332            let batch = q.drain();
333            assert_eq!(batch.len(), (THREADS * 2) * (ITERS * 2));
334        }
335
336        #[test]
337        fn ok_multi_tx_push_drain() {
338            let q = Arc::new(MPSCQueue::default());
339            let producer = {
340                let q = q.clone();
341                thread::spawn(move || {
342                    for i in 0..0x8000 {
343                        q.push(i);
344                    }
345                })
346            };
347
348            let consumer = {
349                let q = q.clone();
350                thread::spawn(move || {
351                    let mut total = 0usize;
352                    while total < 0x8000 {
353                        let batch = q.drain();
354                        total += batch.len();
355                    }
356
357                    total
358                })
359            };
360
361            producer.join().unwrap();
362            let total = consumer.join().unwrap();
363
364            assert_eq!(total, 0x8000);
365        }
366    }
367}