may_queue/
spmc.rs

1use crossbeam_utils::{Backoff, CachePadded};
2use smallvec::SmallVec;
3
4use crate::atomic::{AtomicPtr, AtomicUsize};
5
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::mem::MaybeUninit;
9use std::ptr;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12
13// size for block_node
14pub const BLOCK_SIZE: usize = 1 << BLOCK_SHIFT;
15// block mask
16pub const BLOCK_MASK: usize = BLOCK_SIZE - 1;
17// block shift
18pub const BLOCK_SHIFT: usize = 5;
19
20/// A slot in a block.
21struct Slot<T> {
22    /// The value.
23    value: UnsafeCell<MaybeUninit<T>>,
24}
25
26impl<T> Slot<T> {
27    #[allow(clippy::declare_interior_mutable_const)]
28    const UNINIT: Self = Self {
29        value: UnsafeCell::new(MaybeUninit::uninit()),
30    };
31}
32
33/// a block node contains a bunch of items stored in a array
34/// this could make the malloc/free not that frequent, also
35/// the array could speed up list operations
36#[repr(align(32))]
37struct BlockNode<T> {
38    data: [Slot<T>; BLOCK_SIZE],
39    used: AtomicUsize,
40    next: AtomicPtr<BlockNode<T>>,
41    start: AtomicUsize, // start index of the block
42}
43
44/// we don't implement the block node Drop trait
45/// the queue is responsible to drop all the items
46/// and would call its get() method for the dropping
47impl<T> BlockNode<T> {
48    /// create a new BlockNode with uninitialized data
49    #[inline]
50    fn new(index: usize) -> *mut BlockNode<T> {
51        Box::into_raw(Box::new(BlockNode {
52            next: AtomicPtr::new(ptr::null_mut()),
53            used: AtomicUsize::new(BLOCK_SIZE),
54            data: [Slot::UNINIT; BLOCK_SIZE],
55            start: AtomicUsize::new(index),
56        }))
57    }
58
59    /// write index with data
60    #[inline]
61    fn set(&self, index: usize, v: T) {
62        unsafe {
63            let data = self.data.get_unchecked(index & BLOCK_MASK);
64            data.value.get().write(MaybeUninit::new(v));
65        }
66    }
67
68    /// read out indexed value
69    /// this would make the underlying data dropped when it get out of scope
70    #[inline]
71    fn get(&self, id: usize) -> T {
72        debug_assert!(id < BLOCK_SIZE);
73        unsafe {
74            let data = self.data.get_unchecked(id);
75            data.value.get().read().assume_init()
76        }
77    }
78
79    /// make a range slots read
80    /// if all slots read, then we can safely free the block
81    #[inline]
82    fn mark_slots_read(&self, size: usize) -> bool {
83        let old = self.used.fetch_sub(size, Ordering::Relaxed);
84        old == size
85    }
86
87    #[inline]
88    fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
89        let len = end - start;
90        let start = start & BLOCK_MASK;
91        (start..start + len).map(|id| self.get(id)).collect()
92    }
93}
94
95/// A position in a queue.
96#[derive(Debug)]
97struct Position<T> {
98    /// The index in the queue.
99    index: AtomicUsize,
100
101    /// The block in the linked list.
102    block: AtomicPtr<BlockNode<T>>,
103}
104
105impl<T> Position<T> {
106    fn new(block: *mut BlockNode<T>) -> Self {
107        Position {
108            index: AtomicUsize::new(0),
109            block: AtomicPtr::new(block),
110        }
111    }
112}
113
114#[derive(Debug)]
115struct BlockPtr<T>(AtomicPtr<BlockNode<T>>);
116
117impl<T> BlockPtr<T> {
118    #[inline]
119    fn new(block: *mut BlockNode<T>) -> Self {
120        BlockPtr(AtomicPtr::new(block))
121    }
122
123    #[inline]
124    fn unpack(ptr: *mut BlockNode<T>) -> (*mut BlockNode<T>, usize) {
125        let ptr = ptr as usize;
126        let index = ptr & BLOCK_MASK;
127        let ptr = (ptr & !BLOCK_MASK) as *mut BlockNode<T>;
128        (ptr, index)
129    }
130
131    #[inline]
132    fn pack(ptr: *const BlockNode<T>, index: usize) -> *mut BlockNode<T> {
133        ((ptr as usize) | index) as *mut BlockNode<T>
134    }
135}
136
137/// spmc unbounded queue
138#[derive(Debug)]
139pub struct Queue<T> {
140    // ----------------------------------------
141    // use for pop
142    head: CachePadded<BlockPtr<T>>,
143
144    // -----------------------------------------
145    // use for push
146    tail: CachePadded<Position<T>>,
147
148    /// Indicates that dropping a `Queue<T>` may drop values of type `T`.
149    _marker: PhantomData<T>,
150}
151
152unsafe impl<T: Send> Send for Queue<T> {}
153unsafe impl<T: Send> Sync for Queue<T> {}
154
155impl<T> Queue<T> {
156    /// create a spsc queue
157    pub fn new() -> Self {
158        let init_block = BlockNode::<T>::new(0);
159        Queue {
160            head: BlockPtr::new(init_block).into(),
161            tail: Position::new(init_block).into(),
162            _marker: PhantomData,
163        }
164    }
165
166    /// push a value to the back of queue
167    pub fn push(&self, v: T) {
168        let tail = unsafe { &mut *self.tail.block.unsync_load() };
169        let push_index = unsafe { self.tail.index.unsync_load() };
170        // store the data
171        tail.set(push_index, v);
172        // need this to make sure the data is stored before the index is updated
173        std::sync::atomic::fence(Ordering::Release);
174
175        // alloc new block node if the tail is full
176        let new_index = push_index.wrapping_add(1);
177        if new_index & BLOCK_MASK == 0 {
178            let new_tail = BlockNode::new(new_index);
179            // when other thread access next, we already Acquire the container node
180            tail.next.store(new_tail, Ordering::Release);
181            self.tail.block.store(new_tail, Ordering::Relaxed);
182        }
183
184        // commit the push
185        self.tail.index.store(new_index, Ordering::Release);
186    }
187
188    /// pop from the queue, if it's empty return None
189    pub fn pop(&self) -> Option<T> {
190        let backoff = Backoff::new();
191        let mut head = self.head.0.load(Ordering::Acquire);
192        let mut push_index = self.tail.index.load(Ordering::Acquire);
193        let mut tail_block = self.tail.block.load(Ordering::Acquire);
194
195        loop {
196            head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
197            let (block, id) = BlockPtr::unpack(head);
198            if block == tail_block && id >= (push_index & BLOCK_MASK) {
199                return None;
200            }
201
202            let new_head = if id != BLOCK_MASK {
203                BlockPtr::pack(block, id + 1)
204            } else {
205                (head as usize | (1 << 63)) as *mut BlockNode<T>
206            };
207
208            let block = unsafe { &mut *block };
209
210            // commit the pop
211            match self.head.0.compare_exchange_weak(
212                head,
213                new_head,
214                Ordering::AcqRel,
215                Ordering::Acquire,
216            ) {
217                Ok(_) => {
218                    let block_start = block.start.load(Ordering::Relaxed);
219                    let pop_index = block_start + id;
220                    if id == BLOCK_MASK {
221                        push_index = self.tail.index.load(Ordering::Acquire);
222                        // we need to check if there is enough data
223                        if pop_index >= push_index {
224                            // recover the old head, and return None
225                            self.head.0.store(head, Ordering::Release);
226                            return None;
227                        }
228
229                        let next = block.next.load(Ordering::Acquire);
230                        self.head.0.store(next, Ordering::Release);
231                    } else {
232                        // we have to wait if there is enough data
233                        // if no any more produce, this will be a dead loop
234                        while pop_index >= self.tail.index.load(Ordering::Acquire) {
235                            std::thread::sleep(std::time::Duration::from_millis(10));
236                        }
237                    }
238                    // get the data
239                    let v = block.get(id);
240
241                    if block.mark_slots_read(1) {
242                        // we need to free the old block
243                        let _unused_block = unsafe { Box::from_raw(block) };
244                    }
245                    return Some(v);
246                }
247                Err(i) => {
248                    head = i;
249                    backoff.spin();
250                    push_index = self.tail.index.load(Ordering::Acquire);
251                    tail_block = self.tail.block.load(Ordering::Acquire);
252                }
253            }
254        }
255    }
256
257    /// pop from the queue, if it's empty return None
258    fn local_pop(&self) -> Option<T> {
259        let backoff = Backoff::new();
260        let mut head = self.head.0.load(Ordering::Acquire);
261        // this is used for local pop, we can sure that push_index is not changed
262        let push_index = unsafe { self.tail.index.unsync_load() };
263        let tail_block = unsafe { self.tail.block.unsync_load() };
264
265        loop {
266            head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
267            let (block, id) = BlockPtr::unpack(head);
268            if block == tail_block && id >= (push_index & BLOCK_MASK) {
269                return None;
270            }
271
272            let new_head = if id != BLOCK_MASK {
273                BlockPtr::pack(block, id + 1)
274            } else {
275                (head as usize | (1 << 63)) as *mut BlockNode<T>
276            };
277
278            let block = unsafe { &mut *block };
279
280            // commit the pop
281            match self.head.0.compare_exchange_weak(
282                head,
283                new_head,
284                Ordering::AcqRel,
285                Ordering::Acquire,
286            ) {
287                Ok(_) => {
288                    let block_start = block.start.load(Ordering::Relaxed);
289                    let pop_index = block_start + id;
290                    if id == BLOCK_MASK {
291                        // we need to check if there is enough data
292                        if pop_index >= push_index {
293                            // recover the old head, and return None
294                            self.head.0.store(head, Ordering::Release);
295                            return None;
296                        }
297                        let next = block.next.load(Ordering::Acquire);
298                        self.head.0.store(next, Ordering::Release);
299                    } else if pop_index >= push_index {
300                        // pop_index never exceed push_index
301                        assert_eq!(pop_index, push_index);
302                        // advance the push index and this slot is ignored
303                        self.tail.index.store(push_index + 1, Ordering::Relaxed);
304                        if block.mark_slots_read(1) {
305                            // we need to free the old block
306                            let _unused_block = unsafe { Box::from_raw(block) };
307                        }
308                        return None;
309                    }
310
311                    // get the data
312                    let v = block.get(id);
313
314                    if block.mark_slots_read(1) {
315                        // we need to free the old block
316                        let _unused_block = unsafe { Box::from_raw(block) };
317                    }
318                    return Some(v);
319                }
320                Err(i) => {
321                    head = i;
322                    backoff.spin();
323                }
324            }
325        }
326    }
327
328    /// pop from the queue, if it's empty return None
329    pub fn bulk_pop(&self) -> SmallVec<[T; BLOCK_SIZE]> {
330        let mut head = self.head.0.load(Ordering::Acquire);
331        let mut push_index = self.tail.index.load(Ordering::Acquire);
332        let mut tail_block = self.tail.block.load(Ordering::Acquire);
333
334        loop {
335            head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
336            let (block, id) = BlockPtr::unpack(head);
337            let push_id = push_index & BLOCK_MASK;
338            // at least leave one element to the owner
339            if block == tail_block && id >= push_id {
340                return SmallVec::new();
341            }
342
343            let new_id = if block != tail_block { 0 } else { push_id };
344
345            let new_head = if new_id == 0 {
346                (head as usize | (1 << 63)) as *mut BlockNode<T>
347            } else {
348                BlockPtr::pack(block, new_id)
349            };
350
351            let block = unsafe { &mut *block };
352            // only pop within a block
353            match self.head.0.compare_exchange_weak(
354                head,
355                new_head,
356                Ordering::AcqRel,
357                Ordering::Acquire,
358            ) {
359                Ok(_) => {
360                    let block_start = block.start.load(Ordering::Relaxed);
361                    let pop_index = block_start + id;
362
363                    let end;
364                    if new_id == 0 {
365                        push_index = self.tail.index.load(Ordering::Acquire);
366                        if pop_index >= push_index {
367                            // recover the old head, and return None
368                            self.head.0.store(head, Ordering::Release);
369                            return SmallVec::new();
370                        }
371                        end = std::cmp::min(block_start + BLOCK_SIZE, push_index);
372                        let new_id = end & BLOCK_MASK;
373                        if new_id == 0 {
374                            let next = block.next.load(Ordering::Acquire);
375                            self.head.0.store(next, Ordering::Release);
376                        } else {
377                            let new_head = BlockPtr::pack(block, new_id);
378                            self.head.0.store(new_head, Ordering::Release);
379                        }
380                    } else {
381                        end = block_start + new_id;
382                        // we have to wait there is enough data, normally this would not happen
383                        // except for the ABA situation
384                        // if no any more data pushed, this will be a dead loop
385                        while end > self.tail.index.load(Ordering::Acquire) {
386                            std::thread::sleep(std::time::Duration::from_millis(10));
387                        }
388                    }
389
390                    // get the data
391                    let value = block.copy_to_bulk(pop_index, end);
392
393                    if block.mark_slots_read(end - pop_index) {
394                        // we need to free the old block
395                        let _unused_block = unsafe { Box::from_raw(block) };
396                    }
397                    return value;
398                }
399                Err(i) => {
400                    head = i;
401                    push_index = self.tail.index.load(Ordering::Acquire);
402                    tail_block = self.tail.block.load(Ordering::Acquire);
403                }
404            }
405        }
406    }
407
408    /// get the size of queue
409    ///
410    /// # Safety
411    ///
412    /// this function is unsafe because it's possible that the block is destroyed
413    /// and the rare cases that the pop index is bigger than the push index may cause
414    /// return a wrong length
415    pub unsafe fn len(&self) -> usize {
416        let head = self.head.0.load(Ordering::Acquire);
417        let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
418        let (block, id) = BlockPtr::unpack(head);
419        let block = unsafe { &mut *block };
420        // it's unsafe to deref the block, because it could be a destroyed one
421        // we'd better use AtomicUsize index to calc the length
422        // both the tail and head are just shadows of the real tail and head
423        let block_start = block.start.load(Ordering::Relaxed);
424        let pop_index = block_start + id;
425        let push_index = self.tail.index.load(Ordering::Acquire);
426        push_index.wrapping_sub(pop_index)
427    }
428
429    /// if the queue is empty
430    pub fn is_empty(&self) -> bool {
431        let head = self.head.0.load(Ordering::Acquire);
432        let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
433        let (block, id) = BlockPtr::unpack(head);
434
435        let push_index = self.tail.index.load(Ordering::Acquire);
436        let tail_block = self.tail.block.load(Ordering::Acquire);
437
438        block == tail_block && id == (push_index & BLOCK_MASK)
439    }
440}
441
442impl<T> Default for Queue<T> {
443    fn default() -> Self {
444        Queue::new()
445    }
446}
447
448impl<T> Drop for Queue<T> {
449    fn drop(&mut self) {
450        //  pop all the element to make sure the queue is empty
451        while !self.bulk_pop().is_empty() {}
452        let head = self.head.0.load(Ordering::Acquire);
453        let (block, _id) = BlockPtr::unpack(head);
454        let tail = self.tail.block.load(Ordering::Acquire);
455        assert_eq!(block, tail);
456
457        let _unused_block = unsafe { Box::from_raw(block) };
458    }
459}
460
461/// Create a new local run-queue
462pub fn local<T: 'static>() -> (Steal<T>, Local<T>) {
463    let inner = Arc::new(Queue::new());
464
465    let local = Local(inner.clone());
466
467    let remote = Steal(inner);
468
469    (remote, local)
470}
471
472/// Producer handle. May only be used from a single thread.
473pub struct Local<T: 'static>(Arc<Queue<T>>);
474
475/// Consumer handle. May be used from many threads.
476pub struct Steal<T: 'static>(Arc<Queue<T>>);
477
478impl<T> Local<T> {
479    /// Returns true if the queue has entries that can be stolen.
480    #[inline]
481    pub fn is_stealable(&self) -> bool {
482        !self.0.is_empty()
483    }
484
485    /// Returns false if there are any entries in the queue
486    ///
487    /// Separate to is_stealable so that refactors of is_stealable to "protect"
488    /// some tasks from stealing won't affect this
489    #[inline]
490    pub fn has_tasks(&self) -> bool {
491        !self.0.is_empty()
492    }
493
494    /// Pushes a task to the back of the local queue
495    #[inline]
496    pub fn push_back(&mut self, task: T) {
497        self.0.push(task)
498    }
499
500    /// Pops a task from the local queue.
501    #[inline]
502    pub fn pop(&mut self) -> Option<T> {
503        self.0.local_pop()
504    }
505}
506
507impl<T> Steal<T> {
508    #[inline]
509    pub fn is_empty(&self) -> bool {
510        self.0.is_empty()
511    }
512
513    // #[inline]
514    // pub fn len(&self) -> usize {
515    //     self.0.len()
516    // }
517
518    /// Steals block of tasks from self and place them into `dst`.
519    #[inline]
520    pub fn steal_into(&self, dst: &mut Local<T>) -> Option<T> {
521        if std::ptr::eq(&self.0, &dst.0) {
522            return None;
523        }
524        let mut v = self.0.bulk_pop();
525        let ret = v.pop();
526        for t in v {
527            dst.push_back(t);
528        }
529        ret
530    }
531}
532
533impl<T> Clone for Steal<T> {
534    fn clone(&self) -> Steal<T> {
535        Steal(self.0.clone())
536    }
537}
538
539impl<T> Drop for Local<T> {
540    fn drop(&mut self) {
541        if !std::thread::panicking() {
542            assert!(self.pop().is_none(), "queue not empty");
543        }
544    }
545}
546
547#[cfg(all(nightly, test))]
548mod test {
549    extern crate test;
550    use self::test::Bencher;
551    use super::*;
552
553    use std::thread;
554
555    use crate::test_queue::ScBlockPop;
556
557    impl<T> ScBlockPop<T> for super::Queue<T> {
558        fn block_pop(&self) -> T {
559            let backoff = Backoff::new();
560            loop {
561                match self.pop() {
562                    Some(v) => return v,
563                    None => backoff.snooze(),
564                }
565            }
566        }
567    }
568
569    #[test]
570    fn queue_sanity() {
571        let q = Queue::<usize>::new();
572        assert!(q.is_empty());
573        for i in 0..100 {
574            q.push(i);
575        }
576        assert_eq!(unsafe { q.len() }, 100);
577        println!("{q:?}");
578
579        for i in 0..100 {
580            assert_eq!(q.pop(), Some(i));
581        }
582        assert_eq!(q.pop(), None);
583        assert!(q.is_empty());
584    }
585
586    #[bench]
587    fn single_thread_test(b: &mut Bencher) {
588        let q = Queue::new();
589        let mut i = 0;
590        b.iter(|| {
591            q.push(i);
592            assert_eq!(q.pop(), Some(i));
593            i += 1;
594        });
595    }
596
597    #[bench]
598    fn multi_1p1c_test(b: &mut Bencher) {
599        b.iter(|| {
600            let q = Arc::new(Queue::new());
601            let total_work: usize = 1_000_000;
602            // create worker threads that generate mono increasing index
603            let _q = q.clone();
604            // in other thread the value should be still 100
605            thread::spawn(move || {
606                for i in 0..total_work {
607                    _q.push(i);
608                }
609            });
610
611            for i in 0..total_work {
612                let v = q.block_pop();
613                assert_eq!(i, v);
614            }
615        });
616    }
617
618    #[bench]
619    fn multi_1p2c_test(b: &mut Bencher) {
620        b.iter(|| {
621            let q = Arc::new(Queue::new());
622            let total_work: usize = 1_000_000;
623            // create worker threads that generate mono increasing index
624            // in other thread the value should be still 100
625            for i in 0..total_work {
626                q.push(i);
627            }
628
629            let sum = AtomicUsize::new(0);
630            let threads = 20;
631            thread::scope(|s| {
632                (0..threads).for_each(|_| {
633                    s.spawn(|| {
634                        let mut total = 0;
635                        for _i in 0..total_work / threads {
636                            total += q.block_pop();
637                        }
638                        sum.fetch_add(total, Ordering::Relaxed);
639                    });
640                });
641            });
642            assert!(q.is_empty());
643            assert_eq!(sum.load(Ordering::Relaxed), (0..total_work).sum());
644        });
645    }
646
647    #[bench]
648    fn bulk_1p2c_test(b: &mut Bencher) {
649        b.iter(|| {
650            let q = Arc::new(Queue::new());
651            let total_work: usize = 1_000_000;
652            // create worker threads that generate mono increasing index
653            // in other thread the value should be still 100
654            for i in 0..total_work {
655                q.push(i);
656            }
657
658            let total = Arc::new(AtomicUsize::new(0));
659
660            thread::scope(|s| {
661                let threads = 20;
662                for _ in 0..threads {
663                    let q = q.clone();
664                    let total = total.clone();
665                    s.spawn(move || {
666                        while !q.is_empty() {
667                            let v = q.bulk_pop();
668                            if !v.is_empty() {
669                                total.fetch_add(v.len(), Ordering::AcqRel);
670                            }
671                        }
672                    });
673                }
674            });
675            assert!(q.is_empty());
676            assert_eq!(total.load(Ordering::Acquire), total_work);
677        });
678    }
679}