1use crossbeam_utils::{Backoff, CachePadded};
2use smallvec::SmallVec;
3
4use crate::atomic::{AtomicPtr, AtomicUsize};
5
6use std::cell::UnsafeCell;
7use std::marker::PhantomData;
8use std::mem::MaybeUninit;
9use std::ptr;
10use std::sync::atomic::Ordering;
11use std::sync::Arc;
12
13pub const BLOCK_SIZE: usize = 1 << BLOCK_SHIFT;
15pub const BLOCK_MASK: usize = BLOCK_SIZE - 1;
17pub const BLOCK_SHIFT: usize = 5;
19
20struct Slot<T> {
22 value: UnsafeCell<MaybeUninit<T>>,
24}
25
26impl<T> Slot<T> {
27 #[allow(clippy::declare_interior_mutable_const)]
28 const UNINIT: Self = Self {
29 value: UnsafeCell::new(MaybeUninit::uninit()),
30 };
31}
32
33#[repr(align(32))]
37struct BlockNode<T> {
38 data: [Slot<T>; BLOCK_SIZE],
39 used: AtomicUsize,
40 next: AtomicPtr<BlockNode<T>>,
41 start: AtomicUsize, }
43
44impl<T> BlockNode<T> {
48 #[inline]
50 fn new(index: usize) -> *mut BlockNode<T> {
51 Box::into_raw(Box::new(BlockNode {
52 next: AtomicPtr::new(ptr::null_mut()),
53 used: AtomicUsize::new(BLOCK_SIZE),
54 data: [Slot::UNINIT; BLOCK_SIZE],
55 start: AtomicUsize::new(index),
56 }))
57 }
58
59 #[inline]
61 fn set(&self, index: usize, v: T) {
62 unsafe {
63 let data = self.data.get_unchecked(index & BLOCK_MASK);
64 data.value.get().write(MaybeUninit::new(v));
65 }
66 }
67
68 #[inline]
71 fn get(&self, id: usize) -> T {
72 debug_assert!(id < BLOCK_SIZE);
73 unsafe {
74 let data = self.data.get_unchecked(id);
75 data.value.get().read().assume_init()
76 }
77 }
78
79 #[inline]
82 fn mark_slots_read(&self, size: usize) -> bool {
83 let old = self.used.fetch_sub(size, Ordering::Relaxed);
84 old == size
85 }
86
87 #[inline]
88 fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
89 let len = end - start;
90 let start = start & BLOCK_MASK;
91 (start..start + len).map(|id| self.get(id)).collect()
92 }
93}
94
95#[derive(Debug)]
97struct Position<T> {
98 index: AtomicUsize,
100
101 block: AtomicPtr<BlockNode<T>>,
103}
104
105impl<T> Position<T> {
106 fn new(block: *mut BlockNode<T>) -> Self {
107 Position {
108 index: AtomicUsize::new(0),
109 block: AtomicPtr::new(block),
110 }
111 }
112}
113
114#[derive(Debug)]
115struct BlockPtr<T>(AtomicPtr<BlockNode<T>>);
116
117impl<T> BlockPtr<T> {
118 #[inline]
119 fn new(block: *mut BlockNode<T>) -> Self {
120 BlockPtr(AtomicPtr::new(block))
121 }
122
123 #[inline]
124 fn unpack(ptr: *mut BlockNode<T>) -> (*mut BlockNode<T>, usize) {
125 let ptr = ptr as usize;
126 let index = ptr & BLOCK_MASK;
127 let ptr = (ptr & !BLOCK_MASK) as *mut BlockNode<T>;
128 (ptr, index)
129 }
130
131 #[inline]
132 fn pack(ptr: *const BlockNode<T>, index: usize) -> *mut BlockNode<T> {
133 ((ptr as usize) | index) as *mut BlockNode<T>
134 }
135}
136
137#[derive(Debug)]
139pub struct Queue<T> {
140 head: CachePadded<BlockPtr<T>>,
143
144 tail: CachePadded<Position<T>>,
147
148 _marker: PhantomData<T>,
150}
151
152unsafe impl<T: Send> Send for Queue<T> {}
153unsafe impl<T: Send> Sync for Queue<T> {}
154
155impl<T> Queue<T> {
156 pub fn new() -> Self {
158 let init_block = BlockNode::<T>::new(0);
159 Queue {
160 head: BlockPtr::new(init_block).into(),
161 tail: Position::new(init_block).into(),
162 _marker: PhantomData,
163 }
164 }
165
166 pub fn push(&self, v: T) {
168 let tail = unsafe { &mut *self.tail.block.unsync_load() };
169 let push_index = unsafe { self.tail.index.unsync_load() };
170 tail.set(push_index, v);
172 std::sync::atomic::fence(Ordering::Release);
174
175 let new_index = push_index.wrapping_add(1);
177 if new_index & BLOCK_MASK == 0 {
178 let new_tail = BlockNode::new(new_index);
179 tail.next.store(new_tail, Ordering::Release);
181 self.tail.block.store(new_tail, Ordering::Relaxed);
182 }
183
184 self.tail.index.store(new_index, Ordering::Release);
186 }
187
188 pub fn pop(&self) -> Option<T> {
190 let backoff = Backoff::new();
191 let mut head = self.head.0.load(Ordering::Acquire);
192 let mut push_index = self.tail.index.load(Ordering::Acquire);
193 let mut tail_block = self.tail.block.load(Ordering::Acquire);
194
195 loop {
196 head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
197 let (block, id) = BlockPtr::unpack(head);
198 if block == tail_block && id >= (push_index & BLOCK_MASK) {
199 return None;
200 }
201
202 let new_head = if id != BLOCK_MASK {
203 BlockPtr::pack(block, id + 1)
204 } else {
205 (head as usize | (1 << 63)) as *mut BlockNode<T>
206 };
207
208 let block = unsafe { &mut *block };
209
210 match self.head.0.compare_exchange_weak(
212 head,
213 new_head,
214 Ordering::AcqRel,
215 Ordering::Acquire,
216 ) {
217 Ok(_) => {
218 let block_start = block.start.load(Ordering::Relaxed);
219 let pop_index = block_start + id;
220 if id == BLOCK_MASK {
221 push_index = self.tail.index.load(Ordering::Acquire);
222 if pop_index >= push_index {
224 self.head.0.store(head, Ordering::Release);
226 return None;
227 }
228
229 let next = block.next.load(Ordering::Acquire);
230 self.head.0.store(next, Ordering::Release);
231 } else {
232 while pop_index >= self.tail.index.load(Ordering::Acquire) {
235 std::thread::sleep(std::time::Duration::from_millis(10));
236 }
237 }
238 let v = block.get(id);
240
241 if block.mark_slots_read(1) {
242 let _unused_block = unsafe { Box::from_raw(block) };
244 }
245 return Some(v);
246 }
247 Err(i) => {
248 head = i;
249 backoff.spin();
250 push_index = self.tail.index.load(Ordering::Acquire);
251 tail_block = self.tail.block.load(Ordering::Acquire);
252 }
253 }
254 }
255 }
256
257 fn local_pop(&self) -> Option<T> {
259 let backoff = Backoff::new();
260 let mut head = self.head.0.load(Ordering::Acquire);
261 let push_index = unsafe { self.tail.index.unsync_load() };
263 let tail_block = unsafe { self.tail.block.unsync_load() };
264
265 loop {
266 head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
267 let (block, id) = BlockPtr::unpack(head);
268 if block == tail_block && id >= (push_index & BLOCK_MASK) {
269 return None;
270 }
271
272 let new_head = if id != BLOCK_MASK {
273 BlockPtr::pack(block, id + 1)
274 } else {
275 (head as usize | (1 << 63)) as *mut BlockNode<T>
276 };
277
278 let block = unsafe { &mut *block };
279
280 match self.head.0.compare_exchange_weak(
282 head,
283 new_head,
284 Ordering::AcqRel,
285 Ordering::Acquire,
286 ) {
287 Ok(_) => {
288 let block_start = block.start.load(Ordering::Relaxed);
289 let pop_index = block_start + id;
290 if id == BLOCK_MASK {
291 if pop_index >= push_index {
293 self.head.0.store(head, Ordering::Release);
295 return None;
296 }
297 let next = block.next.load(Ordering::Acquire);
298 self.head.0.store(next, Ordering::Release);
299 } else if pop_index >= push_index {
300 assert_eq!(pop_index, push_index);
302 self.tail.index.store(push_index + 1, Ordering::Relaxed);
304 if block.mark_slots_read(1) {
305 let _unused_block = unsafe { Box::from_raw(block) };
307 }
308 return None;
309 }
310
311 let v = block.get(id);
313
314 if block.mark_slots_read(1) {
315 let _unused_block = unsafe { Box::from_raw(block) };
317 }
318 return Some(v);
319 }
320 Err(i) => {
321 head = i;
322 backoff.spin();
323 }
324 }
325 }
326 }
327
328 pub fn bulk_pop(&self) -> SmallVec<[T; BLOCK_SIZE]> {
330 let mut head = self.head.0.load(Ordering::Acquire);
331 let mut push_index = self.tail.index.load(Ordering::Acquire);
332 let mut tail_block = self.tail.block.load(Ordering::Acquire);
333
334 loop {
335 head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
336 let (block, id) = BlockPtr::unpack(head);
337 let push_id = push_index & BLOCK_MASK;
338 if block == tail_block && id >= push_id {
340 return SmallVec::new();
341 }
342
343 let new_id = if block != tail_block { 0 } else { push_id };
344
345 let new_head = if new_id == 0 {
346 (head as usize | (1 << 63)) as *mut BlockNode<T>
347 } else {
348 BlockPtr::pack(block, new_id)
349 };
350
351 let block = unsafe { &mut *block };
352 match self.head.0.compare_exchange_weak(
354 head,
355 new_head,
356 Ordering::AcqRel,
357 Ordering::Acquire,
358 ) {
359 Ok(_) => {
360 let block_start = block.start.load(Ordering::Relaxed);
361 let pop_index = block_start + id;
362
363 let end;
364 if new_id == 0 {
365 push_index = self.tail.index.load(Ordering::Acquire);
366 if pop_index >= push_index {
367 self.head.0.store(head, Ordering::Release);
369 return SmallVec::new();
370 }
371 end = std::cmp::min(block_start + BLOCK_SIZE, push_index);
372 let new_id = end & BLOCK_MASK;
373 if new_id == 0 {
374 let next = block.next.load(Ordering::Acquire);
375 self.head.0.store(next, Ordering::Release);
376 } else {
377 let new_head = BlockPtr::pack(block, new_id);
378 self.head.0.store(new_head, Ordering::Release);
379 }
380 } else {
381 end = block_start + new_id;
382 while end > self.tail.index.load(Ordering::Acquire) {
386 std::thread::sleep(std::time::Duration::from_millis(10));
387 }
388 }
389
390 let value = block.copy_to_bulk(pop_index, end);
392
393 if block.mark_slots_read(end - pop_index) {
394 let _unused_block = unsafe { Box::from_raw(block) };
396 }
397 return value;
398 }
399 Err(i) => {
400 head = i;
401 push_index = self.tail.index.load(Ordering::Acquire);
402 tail_block = self.tail.block.load(Ordering::Acquire);
403 }
404 }
405 }
406 }
407
408 pub unsafe fn len(&self) -> usize {
416 let head = self.head.0.load(Ordering::Acquire);
417 let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
418 let (block, id) = BlockPtr::unpack(head);
419 let block = unsafe { &mut *block };
420 let block_start = block.start.load(Ordering::Relaxed);
424 let pop_index = block_start + id;
425 let push_index = self.tail.index.load(Ordering::Acquire);
426 push_index.wrapping_sub(pop_index)
427 }
428
429 pub fn is_empty(&self) -> bool {
431 let head = self.head.0.load(Ordering::Acquire);
432 let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
433 let (block, id) = BlockPtr::unpack(head);
434
435 let push_index = self.tail.index.load(Ordering::Acquire);
436 let tail_block = self.tail.block.load(Ordering::Acquire);
437
438 block == tail_block && id == (push_index & BLOCK_MASK)
439 }
440}
441
442impl<T> Default for Queue<T> {
443 fn default() -> Self {
444 Queue::new()
445 }
446}
447
448impl<T> Drop for Queue<T> {
449 fn drop(&mut self) {
450 while !self.bulk_pop().is_empty() {}
452 let head = self.head.0.load(Ordering::Acquire);
453 let (block, _id) = BlockPtr::unpack(head);
454 let tail = self.tail.block.load(Ordering::Acquire);
455 assert_eq!(block, tail);
456
457 let _unused_block = unsafe { Box::from_raw(block) };
458 }
459}
460
461pub fn local<T: 'static>() -> (Steal<T>, Local<T>) {
463 let inner = Arc::new(Queue::new());
464
465 let local = Local(inner.clone());
466
467 let remote = Steal(inner);
468
469 (remote, local)
470}
471
472pub struct Local<T: 'static>(Arc<Queue<T>>);
474
475pub struct Steal<T: 'static>(Arc<Queue<T>>);
477
478impl<T> Local<T> {
479 #[inline]
481 pub fn is_stealable(&self) -> bool {
482 !self.0.is_empty()
483 }
484
485 #[inline]
490 pub fn has_tasks(&self) -> bool {
491 !self.0.is_empty()
492 }
493
494 #[inline]
496 pub fn push_back(&mut self, task: T) {
497 self.0.push(task)
498 }
499
500 #[inline]
502 pub fn pop(&mut self) -> Option<T> {
503 self.0.local_pop()
504 }
505}
506
507impl<T> Steal<T> {
508 #[inline]
509 pub fn is_empty(&self) -> bool {
510 self.0.is_empty()
511 }
512
513 #[inline]
520 pub fn steal_into(&self, dst: &mut Local<T>) -> Option<T> {
521 if std::ptr::eq(&self.0, &dst.0) {
522 return None;
523 }
524 let mut v = self.0.bulk_pop();
525 let ret = v.pop();
526 for t in v {
527 dst.push_back(t);
528 }
529 ret
530 }
531}
532
533impl<T> Clone for Steal<T> {
534 fn clone(&self) -> Steal<T> {
535 Steal(self.0.clone())
536 }
537}
538
539impl<T> Drop for Local<T> {
540 fn drop(&mut self) {
541 if !std::thread::panicking() {
542 assert!(self.pop().is_none(), "queue not empty");
543 }
544 }
545}
546
547#[cfg(all(nightly, test))]
548mod test {
549 extern crate test;
550 use self::test::Bencher;
551 use super::*;
552
553 use std::thread;
554
555 use crate::test_queue::ScBlockPop;
556
557 impl<T> ScBlockPop<T> for super::Queue<T> {
558 fn block_pop(&self) -> T {
559 let backoff = Backoff::new();
560 loop {
561 match self.pop() {
562 Some(v) => return v,
563 None => backoff.snooze(),
564 }
565 }
566 }
567 }
568
569 #[test]
570 fn queue_sanity() {
571 let q = Queue::<usize>::new();
572 assert!(q.is_empty());
573 for i in 0..100 {
574 q.push(i);
575 }
576 assert_eq!(unsafe { q.len() }, 100);
577 println!("{q:?}");
578
579 for i in 0..100 {
580 assert_eq!(q.pop(), Some(i));
581 }
582 assert_eq!(q.pop(), None);
583 assert!(q.is_empty());
584 }
585
586 #[bench]
587 fn single_thread_test(b: &mut Bencher) {
588 let q = Queue::new();
589 let mut i = 0;
590 b.iter(|| {
591 q.push(i);
592 assert_eq!(q.pop(), Some(i));
593 i += 1;
594 });
595 }
596
597 #[bench]
598 fn multi_1p1c_test(b: &mut Bencher) {
599 b.iter(|| {
600 let q = Arc::new(Queue::new());
601 let total_work: usize = 1_000_000;
602 let _q = q.clone();
604 thread::spawn(move || {
606 for i in 0..total_work {
607 _q.push(i);
608 }
609 });
610
611 for i in 0..total_work {
612 let v = q.block_pop();
613 assert_eq!(i, v);
614 }
615 });
616 }
617
618 #[bench]
619 fn multi_1p2c_test(b: &mut Bencher) {
620 b.iter(|| {
621 let q = Arc::new(Queue::new());
622 let total_work: usize = 1_000_000;
623 for i in 0..total_work {
626 q.push(i);
627 }
628
629 let sum = AtomicUsize::new(0);
630 let threads = 20;
631 thread::scope(|s| {
632 (0..threads).for_each(|_| {
633 s.spawn(|| {
634 let mut total = 0;
635 for _i in 0..total_work / threads {
636 total += q.block_pop();
637 }
638 sum.fetch_add(total, Ordering::Relaxed);
639 });
640 });
641 });
642 assert!(q.is_empty());
643 assert_eq!(sum.load(Ordering::Relaxed), (0..total_work).sum());
644 });
645 }
646
647 #[bench]
648 fn bulk_1p2c_test(b: &mut Bencher) {
649 b.iter(|| {
650 let q = Arc::new(Queue::new());
651 let total_work: usize = 1_000_000;
652 for i in 0..total_work {
655 q.push(i);
656 }
657
658 let total = Arc::new(AtomicUsize::new(0));
659
660 thread::scope(|s| {
661 let threads = 20;
662 for _ in 0..threads {
663 let q = q.clone();
664 let total = total.clone();
665 s.spawn(move || {
666 while !q.is_empty() {
667 let v = q.bulk_pop();
668 if !v.is_empty() {
669 total.fetch_add(v.len(), Ordering::AcqRel);
670 }
671 }
672 });
673 }
674 });
675 assert!(q.is_empty());
676 assert_eq!(total.load(Ordering::Acquire), total_work);
677 });
678 }
679}