1use core::{ptr, sync::atomic};
40
41#[derive(Debug)]
59pub struct MPSCQueue<T> {
60 head: atomic::AtomicPtr<Node<T>>,
61}
62
63unsafe impl<T> Send for MPSCQueue<T> {}
64unsafe impl<T> Sync for MPSCQueue<T> {}
65
66impl<T> MPSCQueue<T> {
67 #[inline(always)]
78 pub fn is_empty(&self) -> bool {
79 self.head.load(atomic::Ordering::Acquire).is_null()
80 }
81
82 #[inline(always)]
98 pub fn push(&self, value: T) {
99 let mut head = self.head.load(atomic::Ordering::Relaxed);
100 let node = Node::new(value);
101
102 loop {
103 unsafe { (*node).next = head };
104 match self.head.compare_exchange_weak(head, node, atomic::Ordering::AcqRel, atomic::Ordering::Relaxed) {
105 Ok(_) => return,
106 Err(h) => head = h,
107 }
108 }
109 }
110
111 #[inline(always)]
141 pub fn drain(&self) -> Vec<T> {
142 let mut out = Vec::new();
143 let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::AcqRel);
144
145 while !node.is_null() {
146 let boxed = unsafe { Box::from_raw(node) };
147 node = boxed.next;
148 out.push(boxed.value);
149 }
150
151 out
152 }
153}
154
155impl<T> Default for MPSCQueue<T> {
156 fn default() -> Self {
157 Self { head: atomic::AtomicPtr::new(ptr::null_mut()) }
158 }
159}
160
161impl<T> Drop for MPSCQueue<T> {
162 fn drop(&mut self) {
163 let mut node = self.head.swap(ptr::null_mut(), atomic::Ordering::Relaxed);
164 while !node.is_null() {
165 unsafe {
166 let boxed = Box::from_raw(node);
167 node = boxed.next;
168 }
169 }
170 }
171}
172
173struct Node<T> {
174 next: *mut Node<T>,
175 value: T,
176}
177
178impl<T> Node<T> {
179 fn new(value: T) -> *mut Self {
180 Box::into_raw(Box::new(Self { next: ptr::null_mut(), value }))
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187 use std::sync::{Arc, Barrier};
188 use std::thread;
189
190 mod basics {
191 use super::*;
192
193 #[test]
194 fn ok_push_drain_single() {
195 let q = MPSCQueue::default();
196 q.push(1usize);
197
198 let batch = q.drain();
199 assert_eq!(batch, vec![1]);
200 }
201
202 #[test]
203 fn ok_push_drain_multiple() {
204 let q = MPSCQueue::default();
205
206 q.push(1);
207 q.push(2);
208 q.push(3);
209
210 let batch = q.drain();
211 assert_eq!(batch.len(), 3);
212 assert_eq!(batch, vec![3, 2, 1]);
213 }
214
215 #[test]
216 fn ok_drain_empty_when_queue_empty() {
217 let q: MPSCQueue<usize> = MPSCQueue::default();
218 let batch = q.drain();
219 assert!(batch.is_empty());
220 }
221 }
222
223 mod empty {
224 use super::*;
225
226 #[test]
227 fn ok_is_empty_true_on_init() {
228 let q: MPSCQueue<usize> = MPSCQueue::default();
229 assert!(q.is_empty());
230 }
231
232 #[test]
233 fn ok_is_empty_false_after_push() {
234 let q = MPSCQueue::default();
235 q.push(1);
236 assert!(!q.is_empty());
237 }
238
239 #[test]
240 fn ok_is_empty_true_after_drain() {
241 let q = MPSCQueue::default();
242
243 q.push(1);
244 q.push(2);
245
246 let _ = q.drain();
247 assert!(q.is_empty());
248 }
249 }
250
251 mod cycles {
252 use super::*;
253
254 #[test]
255 fn ok_single_push_drain_cycles() {
256 let q = MPSCQueue::default();
257 for i in 0..0x400 {
258 q.push(i);
259 let batch = q.drain();
260
261 assert_eq!(batch.len(), 1);
262 assert_eq!(batch[0], i);
263 }
264 }
265
266 #[test]
267 fn ok_multi_push_drain_cycles() {
268 let q = MPSCQueue::default();
269 for _ in 0..0x200 {
270 for i in 0..0x0A {
271 q.push(i);
272 }
273
274 let batch = q.drain();
275 assert_eq!(batch.len(), 0x0A);
276 }
277 }
278 }
279
280 mod concurrency {
281 use super::*;
282
283 const THREADS: usize = 0x0A;
284 const ITERS: usize = 0x2000;
285
286 #[test]
287 fn ok_multi_tx_push() {
288 let q = Arc::new(MPSCQueue::default());
289
290 let mut handles = Vec::new();
291 for _ in 0..THREADS {
292 let q = q.clone();
293 handles.push(thread::spawn(move || {
294 for i in 0..ITERS {
295 q.push(i);
296 }
297 }));
298 }
299
300 for h in handles {
301 h.join().unwrap();
302 }
303
304 let batch = q.drain();
305 assert_eq!(batch.len(), THREADS * ITERS);
306 }
307
308 #[test]
309 fn ok_multi_tx_push_high_contention() {
310 let q = Arc::new(MPSCQueue::default());
311 let barrier = Arc::new(Barrier::new(THREADS * 2));
312
313 let mut handles = Vec::new();
314
315 for _ in 0..(THREADS * 2) {
316 let q = q.clone();
317 let barrier = barrier.clone();
318
319 handles.push(thread::spawn(move || {
320 barrier.wait();
321
322 for i in 0..(ITERS * 2) {
323 q.push(i);
324 }
325 }));
326 }
327
328 for h in handles {
329 h.join().unwrap();
330 }
331
332 let batch = q.drain();
333 assert_eq!(batch.len(), (THREADS * 2) * (ITERS * 2));
334 }
335
336 #[test]
337 fn ok_multi_tx_push_drain() {
338 let q = Arc::new(MPSCQueue::default());
339 let producer = {
340 let q = q.clone();
341 thread::spawn(move || {
342 for i in 0..0x8000 {
343 q.push(i);
344 }
345 })
346 };
347
348 let consumer = {
349 let q = q.clone();
350 thread::spawn(move || {
351 let mut total = 0usize;
352 while total < 0x8000 {
353 let batch = q.drain();
354 total += batch.len();
355 }
356
357 total
358 })
359 };
360
361 producer.join().unwrap();
362 let total = consumer.join().unwrap();
363
364 assert_eq!(total, 0x8000);
365 }
366 }
367}