1use core::{ptr, sync::atomic};
40
41#[derive(Debug)]
59pub struct MPSCQueue<T> {
60 head: atomic::AtomicPtr<Node<T>>,
61}
62
63unsafe impl<T> Send for MPSCQueue<T> {}
64unsafe impl<T> Sync for MPSCQueue<T> {}
65
66impl<T> MPSCQueue<T> {
67 #[inline(always)]
78 pub fn is_empty(&self) -> bool {
79 self.head.load(atomic::Ordering::Acquire).is_null()
80 }
81
82 #[inline(always)]
98 pub fn push(&self, value: T) {
99 let mut head = self.head.load(atomic::Ordering::Relaxed);
100 let node = Node::new(value);
101
102 loop {
103 unsafe { (*node).next = head };
104 match self.head.compare_exchange_weak(
105 head,
106 node,
107 atomic::Ordering::AcqRel,
108 atomic::Ordering::Relaxed,
109 ) {
110 Ok(_) => return,
111 Err(h) => head = h,
112 }
113 }
114 }
115
116 #[inline(always)]
146 pub fn drain(&self) -> Vec<T> {
147 let mut out = Vec::new();
148 let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
149
150 while !node.is_null() {
151 let boxed = unsafe { Box::from_raw(node) };
152 node = boxed.next;
153 out.push(boxed.value);
154 }
155
156 out
157 }
158}
159
160impl<T> Default for MPSCQueue<T> {
161 fn default() -> Self {
162 Self { head: atomic::AtomicPtr::new(ptr::null_mut()) }
163 }
164}
165
166impl<T> Drop for MPSCQueue<T> {
167 fn drop(&mut self) {
168 let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::Relaxed);
169 while !node.is_null() {
170 unsafe {
171 let boxed = Box::from_raw(node);
172 node = boxed.next;
173 }
174 }
175 }
176}
177
178struct Node<T> {
179 next: *mut Node<T>,
180 value: T,
181}
182
183impl<T> Node<T> {
184 fn new(value: T) -> *mut Self {
185 Box::into_raw(Box::new(Self { next: ptr::null_mut(), value }))
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use std::sync::{Arc, Barrier};
193 use std::thread;
194
195 mod basics {
196 use super::*;
197
198 #[test]
199 fn ok_push_drain_single() {
200 let q = MPSCQueue::default();
201 q.push(1usize);
202
203 let batch = q.drain();
204 assert_eq!(batch, vec![1]);
205 }
206
207 #[test]
208 fn ok_push_drain_multiple() {
209 let q = MPSCQueue::default();
210
211 q.push(1);
212 q.push(2);
213 q.push(3);
214
215 let batch = q.drain();
216 assert_eq!(batch.len(), 3);
217 assert_eq!(batch, vec![3, 2, 1]);
218 }
219
220 #[test]
221 fn ok_drain_empty_when_queue_empty() {
222 let q: MPSCQueue<usize> = MPSCQueue::default();
223 let batch = q.drain();
224 assert!(batch.is_empty());
225 }
226 }
227
228 mod empty {
229 use super::*;
230
231 #[test]
232 fn ok_is_empty_true_on_init() {
233 let q: MPSCQueue<usize> = MPSCQueue::default();
234 assert!(q.is_empty());
235 }
236
237 #[test]
238 fn ok_is_empty_false_after_push() {
239 let q = MPSCQueue::default();
240 q.push(1);
241 assert!(!q.is_empty());
242 }
243
244 #[test]
245 fn ok_is_empty_true_after_drain() {
246 let q = MPSCQueue::default();
247
248 q.push(1);
249 q.push(2);
250
251 let _ = q.drain();
252 assert!(q.is_empty());
253 }
254 }
255
256 mod cycles {
257 use super::*;
258
259 #[test]
260 fn ok_single_push_drain_cycles() {
261 let q = MPSCQueue::default();
262 for i in 0..0x400 {
263 q.push(i);
264 let batch = q.drain();
265
266 assert_eq!(batch.len(), 1);
267 assert_eq!(batch[0], i);
268 }
269 }
270
271 #[test]
272 fn ok_multi_push_drain_cycles() {
273 let q = MPSCQueue::default();
274 for _ in 0..0x200 {
275 for i in 0..0x0A {
276 q.push(i);
277 }
278
279 let batch = q.drain();
280 assert_eq!(batch.len(), 0x0A);
281 }
282 }
283 }
284
285 mod concurrency {
286 use super::*;
287
288 const THREADS: usize = 0x0A;
289 const ITERS: usize = 0x2000;
290
291 #[test]
292 fn ok_multi_tx_push() {
293 let q = Arc::new(MPSCQueue::default());
294
295 let mut handles = Vec::new();
296 for _ in 0..THREADS {
297 let q = q.clone();
298 handles.push(thread::spawn(move || {
299 for i in 0..ITERS {
300 q.push(i);
301 }
302 }));
303 }
304
305 for h in handles {
306 h.join().unwrap();
307 }
308
309 let batch = q.drain();
310 assert_eq!(batch.len(), THREADS * ITERS);
311 }
312
313 #[test]
314 fn ok_multi_tx_push_high_contention() {
315 let q = Arc::new(MPSCQueue::default());
316 let barrier = Arc::new(Barrier::new(THREADS * 2));
317
318 let mut handles = Vec::new();
319
320 for _ in 0..(THREADS * 2) {
321 let q = q.clone();
322 let barrier = barrier.clone();
323
324 handles.push(thread::spawn(move || {
325 barrier.wait();
326
327 for i in 0..(ITERS * 2) {
328 q.push(i);
329 }
330 }));
331 }
332
333 for h in handles {
334 h.join().unwrap();
335 }
336
337 let batch = q.drain();
338 assert_eq!(batch.len(), (THREADS * 2) * (ITERS * 2));
339 }
340
341 #[test]
342 fn ok_multi_tx_push_drain() {
343 let q = Arc::new(MPSCQueue::default());
344 let producer = {
345 let q = q.clone();
346 thread::spawn(move || {
347 for i in 0..0x8000 {
348 q.push(i);
349 }
350 })
351 };
352
353 let consumer = {
354 let q = q.clone();
355 thread::spawn(move || {
356 let mut total = 0usize;
357 while total < 0x8000 {
358 let batch = q.drain();
359 total += batch.len();
360 }
361
362 total
363 })
364 };
365
366 producer.join().unwrap();
367 let total = consumer.join().unwrap();
368
369 assert_eq!(total, 0x8000);
370 }
371 }
372}