may_queue 0.1.23

May's internal queue library
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
use crossbeam_utils::{Backoff, CachePadded};
use smallvec::SmallVec;

use crate::atomic::{AtomicPtr, AtomicUsize};

use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::Ordering;
use std::sync::Arc;

// size for block_node
pub const BLOCK_SIZE: usize = 1 << BLOCK_SHIFT;
// block mask
pub const BLOCK_MASK: usize = BLOCK_SIZE - 1;
// block shift
pub const BLOCK_SHIFT: usize = 5;

/// A slot in a block.
struct Slot<T> {
    /// The value.
    value: UnsafeCell<MaybeUninit<T>>,
}

impl<T> Slot<T> {
    #[allow(clippy::declare_interior_mutable_const)]
    const UNINIT: Self = Self {
        value: UnsafeCell::new(MaybeUninit::uninit()),
    };
}

/// a block node contains a bunch of items stored in a array
/// this could make the malloc/free not that frequent, also
/// the array could speed up list operations
#[repr(align(32))]
struct BlockNode<T> {
    data: [Slot<T>; BLOCK_SIZE],
    used: AtomicUsize,
    next: AtomicPtr<BlockNode<T>>,
    start: AtomicUsize, // start index of the block
}

/// we don't implement the block node Drop trait
/// the queue is responsible to drop all the items
/// and would call its get() method for the dropping
impl<T> BlockNode<T> {
    /// create a new BlockNode with uninitialized data
    #[inline]
    fn new(index: usize) -> *mut BlockNode<T> {
        Box::into_raw(Box::new(BlockNode {
            next: AtomicPtr::new(ptr::null_mut()),
            used: AtomicUsize::new(BLOCK_SIZE),
            data: [Slot::UNINIT; BLOCK_SIZE],
            start: AtomicUsize::new(index),
        }))
    }

    /// write index with data
    #[inline]
    fn set(&self, index: usize, v: T) {
        unsafe {
            let data = self.data.get_unchecked(index & BLOCK_MASK);
            data.value.get().write(MaybeUninit::new(v));
        }
    }

    /// read out indexed value
    /// this would make the underlying data dropped when it get out of scope
    #[inline]
    fn get(&self, id: usize) -> T {
        debug_assert!(id < BLOCK_SIZE);
        unsafe {
            let data = self.data.get_unchecked(id);
            data.value.get().read().assume_init()
        }
    }

    /// make a range slots read
    /// if all slots read, then we can safely free the block
    #[inline]
    fn mark_slots_read(&self, size: usize) -> bool {
        let old = self.used.fetch_sub(size, Ordering::Relaxed);
        old == size
    }

    #[inline]
    fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
        let len = end - start;
        let start = start & BLOCK_MASK;
        (start..start + len).map(|id| self.get(id)).collect()
    }
}

/// A position in a queue.
#[derive(Debug)]
struct Position<T> {
    /// The index in the queue.
    index: AtomicUsize,

    /// The block in the linked list.
    block: AtomicPtr<BlockNode<T>>,
}

impl<T> Position<T> {
    fn new(block: *mut BlockNode<T>) -> Self {
        Position {
            index: AtomicUsize::new(0),
            block: AtomicPtr::new(block),
        }
    }
}

#[derive(Debug)]
struct BlockPtr<T>(AtomicPtr<BlockNode<T>>);

impl<T> BlockPtr<T> {
    #[inline]
    fn new(block: *mut BlockNode<T>) -> Self {
        BlockPtr(AtomicPtr::new(block))
    }

    #[inline]
    fn unpack(ptr: *mut BlockNode<T>) -> (*mut BlockNode<T>, usize) {
        let ptr = ptr as usize;
        let index = ptr & BLOCK_MASK;
        let ptr = (ptr & !BLOCK_MASK) as *mut BlockNode<T>;
        (ptr, index)
    }

    #[inline]
    fn pack(ptr: *const BlockNode<T>, index: usize) -> *mut BlockNode<T> {
        ((ptr as usize) | index) as *mut BlockNode<T>
    }
}

/// spmc unbounded queue
#[derive(Debug)]
pub struct Queue<T> {
    // ----------------------------------------
    // use for pop
    head: CachePadded<BlockPtr<T>>,

    // -----------------------------------------
    // use for push
    tail: CachePadded<Position<T>>,

    /// Indicates that dropping a `Queue<T>` may drop values of type `T`.
    _marker: PhantomData<T>,
}

unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}

impl<T> Queue<T> {
    /// create a spsc queue
    pub fn new() -> Self {
        let init_block = BlockNode::<T>::new(0);
        Queue {
            head: BlockPtr::new(init_block).into(),
            tail: Position::new(init_block).into(),
            _marker: PhantomData,
        }
    }

    /// push a value to the back of queue
    pub fn push(&self, v: T) {
        let tail = unsafe { &mut *self.tail.block.unsync_load() };
        let push_index = unsafe { self.tail.index.unsync_load() };
        // store the data
        tail.set(push_index, v);
        // need this to make sure the data is stored before the index is updated
        std::sync::atomic::fence(Ordering::Release);

        // alloc new block node if the tail is full
        let new_index = push_index.wrapping_add(1);
        if new_index & BLOCK_MASK == 0 {
            let new_tail = BlockNode::new(new_index);
            // when other thread access next, we already Acquire the container node
            tail.next.store(new_tail, Ordering::Release);
            self.tail.block.store(new_tail, Ordering::Relaxed);
        }

        // commit the push
        self.tail.index.store(new_index, Ordering::Release);
    }

    /// pop from the queue, if it's empty return None
    pub fn pop(&self) -> Option<T> {
        let backoff = Backoff::new();
        let mut head = self.head.0.load(Ordering::Acquire);
        let mut push_index = self.tail.index.load(Ordering::Acquire);
        let mut tail_block = self.tail.block.load(Ordering::Acquire);

        loop {
            head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
            let (block, id) = BlockPtr::unpack(head);
            if block == tail_block && id >= (push_index & BLOCK_MASK) {
                return None;
            }

            let new_head = if id != BLOCK_MASK {
                BlockPtr::pack(block, id + 1)
            } else {
                (head as usize | (1 << 63)) as *mut BlockNode<T>
            };

            let block = unsafe { &mut *block };

            // commit the pop
            match self.head.0.compare_exchange_weak(
                head,
                new_head,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    let block_start = block.start.load(Ordering::Relaxed);
                    let pop_index = block_start + id;
                    if id == BLOCK_MASK {
                        push_index = self.tail.index.load(Ordering::Acquire);
                        // we need to check if there is enough data
                        if pop_index >= push_index {
                            // recover the old head, and return None
                            self.head.0.store(head, Ordering::Release);
                            return None;
                        }

                        let next = block.next.load(Ordering::Acquire);
                        self.head.0.store(next, Ordering::Release);
                    } else {
                        // we have to wait if there is enough data
                        // if no any more produce, this will be a dead loop
                        while pop_index >= self.tail.index.load(Ordering::Acquire) {
                            std::thread::sleep(std::time::Duration::from_millis(10));
                        }
                    }
                    // get the data
                    let v = block.get(id);

                    if block.mark_slots_read(1) {
                        // we need to free the old block
                        let _unused_block = unsafe { Box::from_raw(block) };
                    }
                    return Some(v);
                }
                Err(i) => {
                    head = i;
                    backoff.spin();
                    push_index = self.tail.index.load(Ordering::Acquire);
                    tail_block = self.tail.block.load(Ordering::Acquire);
                }
            }
        }
    }

    /// pop from the queue, if it's empty return None
    fn local_pop(&self) -> Option<T> {
        let backoff = Backoff::new();
        let mut head = self.head.0.load(Ordering::Acquire);
        // this is used for local pop, we can sure that push_index is not changed
        let push_index = unsafe { self.tail.index.unsync_load() };
        let tail_block = unsafe { self.tail.block.unsync_load() };

        loop {
            head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
            let (block, id) = BlockPtr::unpack(head);
            if block == tail_block && id >= (push_index & BLOCK_MASK) {
                return None;
            }

            let new_head = if id != BLOCK_MASK {
                BlockPtr::pack(block, id + 1)
            } else {
                (head as usize | (1 << 63)) as *mut BlockNode<T>
            };

            let block = unsafe { &mut *block };

            // commit the pop
            match self.head.0.compare_exchange_weak(
                head,
                new_head,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    let block_start = block.start.load(Ordering::Relaxed);
                    let pop_index = block_start + id;
                    if id == BLOCK_MASK {
                        // we need to check if there is enough data
                        if pop_index >= push_index {
                            // recover the old head, and return None
                            self.head.0.store(head, Ordering::Release);
                            return None;
                        }
                        let next = block.next.load(Ordering::Acquire);
                        self.head.0.store(next, Ordering::Release);
                    } else if pop_index >= push_index {
                        // pop_index never exceed push_index
                        assert_eq!(pop_index, push_index);
                        // advance the push index and this slot is ignored
                        self.tail.index.store(push_index + 1, Ordering::Relaxed);
                        if block.mark_slots_read(1) {
                            // we need to free the old block
                            let _unused_block = unsafe { Box::from_raw(block) };
                        }
                        return None;
                    }

                    // get the data
                    let v = block.get(id);

                    if block.mark_slots_read(1) {
                        // we need to free the old block
                        let _unused_block = unsafe { Box::from_raw(block) };
                    }
                    return Some(v);
                }
                Err(i) => {
                    head = i;
                    backoff.spin();
                }
            }
        }
    }

    /// pop from the queue, if it's empty return None
    pub fn bulk_pop(&self) -> SmallVec<[T; BLOCK_SIZE]> {
        let mut head = self.head.0.load(Ordering::Acquire);
        let mut push_index = self.tail.index.load(Ordering::Acquire);
        let mut tail_block = self.tail.block.load(Ordering::Acquire);

        loop {
            head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
            let (block, id) = BlockPtr::unpack(head);
            let push_id = push_index & BLOCK_MASK;
            // at least leave one element to the owner
            if block == tail_block && id >= push_id {
                return SmallVec::new();
            }

            let new_id = if block != tail_block { 0 } else { push_id };

            let new_head = if new_id == 0 {
                (head as usize | (1 << 63)) as *mut BlockNode<T>
            } else {
                BlockPtr::pack(block, new_id)
            };

            let block = unsafe { &mut *block };
            // only pop within a block
            match self.head.0.compare_exchange_weak(
                head,
                new_head,
                Ordering::AcqRel,
                Ordering::Acquire,
            ) {
                Ok(_) => {
                    let block_start = block.start.load(Ordering::Relaxed);
                    let pop_index = block_start + id;

                    let end;
                    if new_id == 0 {
                        push_index = self.tail.index.load(Ordering::Acquire);
                        if pop_index >= push_index {
                            // recover the old head, and return None
                            self.head.0.store(head, Ordering::Release);
                            return SmallVec::new();
                        }
                        end = std::cmp::min(block_start + BLOCK_SIZE, push_index);
                        let new_id = end & BLOCK_MASK;
                        if new_id == 0 {
                            let next = block.next.load(Ordering::Acquire);
                            self.head.0.store(next, Ordering::Release);
                        } else {
                            let new_head = BlockPtr::pack(block, new_id);
                            self.head.0.store(new_head, Ordering::Release);
                        }
                    } else {
                        end = block_start + new_id;
                        // we have to wait there is enough data, normally this would not happen
                        // except for the ABA situation
                        // if no any more data pushed, this will be a dead loop
                        while end > self.tail.index.load(Ordering::Acquire) {
                            std::thread::sleep(std::time::Duration::from_millis(10));
                        }
                    }

                    // get the data
                    let value = block.copy_to_bulk(pop_index, end);

                    if block.mark_slots_read(end - pop_index) {
                        // we need to free the old block
                        let _unused_block = unsafe { Box::from_raw(block) };
                    }
                    return value;
                }
                Err(i) => {
                    head = i;
                    push_index = self.tail.index.load(Ordering::Acquire);
                    tail_block = self.tail.block.load(Ordering::Acquire);
                }
            }
        }
    }

    /// get the size of queue
    ///
    /// # Safety
    ///
    /// this function is unsafe because it's possible that the block is destroyed
    /// and the rare cases that the pop index is bigger than the push index may cause
    /// return a wrong length
    pub unsafe fn len(&self) -> usize {
        let head = self.head.0.load(Ordering::Acquire);
        let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
        let (block, id) = BlockPtr::unpack(head);
        let block = unsafe { &mut *block };
        // it's unsafe to deref the block, because it could be a destroyed one
        // we'd better use AtomicUsize index to calc the length
        // both the tail and head are just shadows of the real tail and head
        let block_start = block.start.load(Ordering::Relaxed);
        let pop_index = block_start + id;
        let push_index = self.tail.index.load(Ordering::Acquire);
        push_index.wrapping_sub(pop_index)
    }

    /// if the queue is empty
    pub fn is_empty(&self) -> bool {
        let head = self.head.0.load(Ordering::Acquire);
        let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
        let (block, id) = BlockPtr::unpack(head);

        let push_index = self.tail.index.load(Ordering::Acquire);
        let tail_block = self.tail.block.load(Ordering::Acquire);

        block == tail_block && id == (push_index & BLOCK_MASK)
    }
}

impl<T> Default for Queue<T> {
    fn default() -> Self {
        Queue::new()
    }
}

impl<T> Drop for Queue<T> {
    fn drop(&mut self) {
        //  pop all the element to make sure the queue is empty
        while !self.bulk_pop().is_empty() {}
        let head = self.head.0.load(Ordering::Acquire);
        let (block, _id) = BlockPtr::unpack(head);
        let tail = self.tail.block.load(Ordering::Acquire);
        assert_eq!(block, tail);

        let _unused_block = unsafe { Box::from_raw(block) };
    }
}

/// Create a new local run-queue
pub fn local<T: 'static>() -> (Steal<T>, Local<T>) {
    let inner = Arc::new(Queue::new());

    let local = Local(inner.clone());

    let remote = Steal(inner);

    (remote, local)
}

/// Producer handle. May only be used from a single thread.
pub struct Local<T: 'static>(Arc<Queue<T>>);

/// Consumer handle. May be used from many threads.
pub struct Steal<T: 'static>(Arc<Queue<T>>);

impl<T> Local<T> {
    /// Returns true if the queue has entries that can be stolen.
    #[inline]
    pub fn is_stealable(&self) -> bool {
        !self.0.is_empty()
    }

    /// Returns false if there are any entries in the queue
    ///
    /// Separate to is_stealable so that refactors of is_stealable to "protect"
    /// some tasks from stealing won't affect this
    #[inline]
    pub fn has_tasks(&self) -> bool {
        !self.0.is_empty()
    }

    /// Pushes a task to the back of the local queue
    #[inline]
    pub fn push_back(&mut self, task: T) {
        self.0.push(task)
    }

    /// Pops a task from the local queue.
    #[inline]
    pub fn pop(&mut self) -> Option<T> {
        self.0.local_pop()
    }
}

impl<T> Steal<T> {
    #[inline]
    pub fn is_empty(&self) -> bool {
        self.0.is_empty()
    }

    // #[inline]
    // pub fn len(&self) -> usize {
    //     self.0.len()
    // }

    /// Steals block of tasks from self and place them into `dst`.
    #[inline]
    pub fn steal_into(&self, dst: &mut Local<T>) -> Option<T> {
        if std::ptr::eq(&self.0, &dst.0) {
            return None;
        }
        let mut v = self.0.bulk_pop();
        let ret = v.pop();
        for t in v {
            dst.push_back(t);
        }
        ret
    }
}

impl<T> Clone for Steal<T> {
    fn clone(&self) -> Steal<T> {
        Steal(self.0.clone())
    }
}

impl<T> Drop for Local<T> {
    fn drop(&mut self) {
        if !std::thread::panicking() {
            assert!(self.pop().is_none(), "queue not empty");
        }
    }
}

#[cfg(all(nightly, test))]
mod test {
    extern crate test;
    use self::test::Bencher;
    use super::*;

    use std::thread;

    use crate::test_queue::ScBlockPop;

    impl<T> ScBlockPop<T> for super::Queue<T> {
        fn block_pop(&self) -> T {
            let backoff = Backoff::new();
            loop {
                match self.pop() {
                    Some(v) => return v,
                    None => backoff.snooze(),
                }
            }
        }
    }

    #[test]
    fn queue_sanity() {
        let q = Queue::<usize>::new();
        assert!(q.is_empty());
        for i in 0..100 {
            q.push(i);
        }
        assert_eq!(unsafe { q.len() }, 100);
        println!("{q:?}");

        for i in 0..100 {
            assert_eq!(q.pop(), Some(i));
        }
        assert_eq!(q.pop(), None);
        assert!(q.is_empty());
    }

    #[bench]
    fn single_thread_test(b: &mut Bencher) {
        let q = Queue::new();
        let mut i = 0;
        b.iter(|| {
            q.push(i);
            assert_eq!(q.pop(), Some(i));
            i += 1;
        });
    }

    #[bench]
    fn multi_1p1c_test(b: &mut Bencher) {
        b.iter(|| {
            let q = Arc::new(Queue::new());
            let total_work: usize = 1_000_000;
            // create worker threads that generate mono increasing index
            let _q = q.clone();
            // in other thread the value should be still 100
            thread::spawn(move || {
                for i in 0..total_work {
                    _q.push(i);
                }
            });

            for i in 0..total_work {
                let v = q.block_pop();
                assert_eq!(i, v);
            }
        });
    }

    #[bench]
    fn multi_1p2c_test(b: &mut Bencher) {
        b.iter(|| {
            let q = Arc::new(Queue::new());
            let total_work: usize = 1_000_000;
            // create worker threads that generate mono increasing index
            // in other thread the value should be still 100
            for i in 0..total_work {
                q.push(i);
            }

            let sum = AtomicUsize::new(0);
            let threads = 20;
            thread::scope(|s| {
                (0..threads).for_each(|_| {
                    s.spawn(|| {
                        let mut total = 0;
                        for _i in 0..total_work / threads {
                            total += q.block_pop();
                        }
                        sum.fetch_add(total, Ordering::Relaxed);
                    });
                });
            });
            assert!(q.is_empty());
            assert_eq!(sum.load(Ordering::Relaxed), (0..total_work).sum());
        });
    }

    #[bench]
    fn bulk_1p2c_test(b: &mut Bencher) {
        b.iter(|| {
            let q = Arc::new(Queue::new());
            let total_work: usize = 1_000_000;
            // create worker threads that generate mono increasing index
            // in other thread the value should be still 100
            for i in 0..total_work {
                q.push(i);
            }

            let total = Arc::new(AtomicUsize::new(0));

            thread::scope(|s| {
                let threads = 20;
                for _ in 0..threads {
                    let q = q.clone();
                    let total = total.clone();
                    s.spawn(move || {
                        while !q.is_empty() {
                            let v = q.bulk_pop();
                            if !v.is_empty() {
                                total.fetch_add(v.len(), Ordering::AcqRel);
                            }
                        }
                    });
                }
            });
            assert!(q.is_empty());
            assert_eq!(total.load(Ordering::Acquire), total_work);
        });
    }
}