use crossbeam_utils::{Backoff, CachePadded};
use smallvec::SmallVec;
use crate::atomic::{AtomicPtr, AtomicUsize};
use std::cell::UnsafeCell;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::Ordering;
use std::sync::Arc;
pub const BLOCK_SIZE: usize = 1 << BLOCK_SHIFT;
pub const BLOCK_MASK: usize = BLOCK_SIZE - 1;
pub const BLOCK_SHIFT: usize = 5;
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Slot<T> {
#[allow(clippy::declare_interior_mutable_const)]
const UNINIT: Self = Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
};
}
#[repr(align(32))]
struct BlockNode<T> {
data: [Slot<T>; BLOCK_SIZE],
used: AtomicUsize,
next: AtomicPtr<BlockNode<T>>,
start: AtomicUsize, }
impl<T> BlockNode<T> {
#[inline]
fn new(index: usize) -> *mut BlockNode<T> {
Box::into_raw(Box::new(BlockNode {
next: AtomicPtr::new(ptr::null_mut()),
used: AtomicUsize::new(BLOCK_SIZE),
data: [Slot::UNINIT; BLOCK_SIZE],
start: AtomicUsize::new(index),
}))
}
#[inline]
fn set(&self, index: usize, v: T) {
unsafe {
let data = self.data.get_unchecked(index & BLOCK_MASK);
data.value.get().write(MaybeUninit::new(v));
}
}
#[inline]
fn get(&self, id: usize) -> T {
debug_assert!(id < BLOCK_SIZE);
unsafe {
let data = self.data.get_unchecked(id);
data.value.get().read().assume_init()
}
}
#[inline]
fn mark_slots_read(&self, size: usize) -> bool {
let old = self.used.fetch_sub(size, Ordering::Relaxed);
old == size
}
#[inline]
fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
let len = end - start;
let start = start & BLOCK_MASK;
(start..start + len).map(|id| self.get(id)).collect()
}
}
#[derive(Debug)]
struct Position<T> {
index: AtomicUsize,
block: AtomicPtr<BlockNode<T>>,
}
impl<T> Position<T> {
fn new(block: *mut BlockNode<T>) -> Self {
Position {
index: AtomicUsize::new(0),
block: AtomicPtr::new(block),
}
}
}
#[derive(Debug)]
struct BlockPtr<T>(AtomicPtr<BlockNode<T>>);
impl<T> BlockPtr<T> {
#[inline]
fn new(block: *mut BlockNode<T>) -> Self {
BlockPtr(AtomicPtr::new(block))
}
#[inline]
fn unpack(ptr: *mut BlockNode<T>) -> (*mut BlockNode<T>, usize) {
let ptr = ptr as usize;
let index = ptr & BLOCK_MASK;
let ptr = (ptr & !BLOCK_MASK) as *mut BlockNode<T>;
(ptr, index)
}
#[inline]
fn pack(ptr: *const BlockNode<T>, index: usize) -> *mut BlockNode<T> {
((ptr as usize) | index) as *mut BlockNode<T>
}
}
#[derive(Debug)]
pub struct Queue<T> {
head: CachePadded<BlockPtr<T>>,
tail: CachePadded<Position<T>>,
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Queue<T> {
pub fn new() -> Self {
let init_block = BlockNode::<T>::new(0);
Queue {
head: BlockPtr::new(init_block).into(),
tail: Position::new(init_block).into(),
_marker: PhantomData,
}
}
pub fn push(&self, v: T) {
let tail = unsafe { &mut *self.tail.block.unsync_load() };
let push_index = unsafe { self.tail.index.unsync_load() };
tail.set(push_index, v);
std::sync::atomic::fence(Ordering::Release);
let new_index = push_index.wrapping_add(1);
if new_index & BLOCK_MASK == 0 {
let new_tail = BlockNode::new(new_index);
tail.next.store(new_tail, Ordering::Release);
self.tail.block.store(new_tail, Ordering::Relaxed);
}
self.tail.index.store(new_index, Ordering::Release);
}
pub fn pop(&self) -> Option<T> {
let backoff = Backoff::new();
let mut head = self.head.0.load(Ordering::Acquire);
let mut push_index = self.tail.index.load(Ordering::Acquire);
let mut tail_block = self.tail.block.load(Ordering::Acquire);
loop {
head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
let (block, id) = BlockPtr::unpack(head);
if block == tail_block && id >= (push_index & BLOCK_MASK) {
return None;
}
let new_head = if id != BLOCK_MASK {
BlockPtr::pack(block, id + 1)
} else {
(head as usize | (1 << 63)) as *mut BlockNode<T>
};
let block = unsafe { &mut *block };
match self.head.0.compare_exchange_weak(
head,
new_head,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let block_start = block.start.load(Ordering::Relaxed);
let pop_index = block_start + id;
if id == BLOCK_MASK {
push_index = self.tail.index.load(Ordering::Acquire);
if pop_index >= push_index {
self.head.0.store(head, Ordering::Release);
return None;
}
let next = block.next.load(Ordering::Acquire);
self.head.0.store(next, Ordering::Release);
} else {
while pop_index >= self.tail.index.load(Ordering::Acquire) {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
let v = block.get(id);
if block.mark_slots_read(1) {
let _unused_block = unsafe { Box::from_raw(block) };
}
return Some(v);
}
Err(i) => {
head = i;
backoff.spin();
push_index = self.tail.index.load(Ordering::Acquire);
tail_block = self.tail.block.load(Ordering::Acquire);
}
}
}
}
fn local_pop(&self) -> Option<T> {
let backoff = Backoff::new();
let mut head = self.head.0.load(Ordering::Acquire);
let push_index = unsafe { self.tail.index.unsync_load() };
let tail_block = unsafe { self.tail.block.unsync_load() };
loop {
head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
let (block, id) = BlockPtr::unpack(head);
if block == tail_block && id >= (push_index & BLOCK_MASK) {
return None;
}
let new_head = if id != BLOCK_MASK {
BlockPtr::pack(block, id + 1)
} else {
(head as usize | (1 << 63)) as *mut BlockNode<T>
};
let block = unsafe { &mut *block };
match self.head.0.compare_exchange_weak(
head,
new_head,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let block_start = block.start.load(Ordering::Relaxed);
let pop_index = block_start + id;
if id == BLOCK_MASK {
if pop_index >= push_index {
self.head.0.store(head, Ordering::Release);
return None;
}
let next = block.next.load(Ordering::Acquire);
self.head.0.store(next, Ordering::Release);
} else if pop_index >= push_index {
assert_eq!(pop_index, push_index);
self.tail.index.store(push_index + 1, Ordering::Relaxed);
if block.mark_slots_read(1) {
let _unused_block = unsafe { Box::from_raw(block) };
}
return None;
}
let v = block.get(id);
if block.mark_slots_read(1) {
let _unused_block = unsafe { Box::from_raw(block) };
}
return Some(v);
}
Err(i) => {
head = i;
backoff.spin();
}
}
}
}
pub fn bulk_pop(&self) -> SmallVec<[T; BLOCK_SIZE]> {
let mut head = self.head.0.load(Ordering::Acquire);
let mut push_index = self.tail.index.load(Ordering::Acquire);
let mut tail_block = self.tail.block.load(Ordering::Acquire);
loop {
head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
let (block, id) = BlockPtr::unpack(head);
let push_id = push_index & BLOCK_MASK;
if block == tail_block && id >= push_id {
return SmallVec::new();
}
let new_id = if block != tail_block { 0 } else { push_id };
let new_head = if new_id == 0 {
(head as usize | (1 << 63)) as *mut BlockNode<T>
} else {
BlockPtr::pack(block, new_id)
};
let block = unsafe { &mut *block };
match self.head.0.compare_exchange_weak(
head,
new_head,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
let block_start = block.start.load(Ordering::Relaxed);
let pop_index = block_start + id;
let end;
if new_id == 0 {
push_index = self.tail.index.load(Ordering::Acquire);
if pop_index >= push_index {
self.head.0.store(head, Ordering::Release);
return SmallVec::new();
}
end = std::cmp::min(block_start + BLOCK_SIZE, push_index);
let new_id = end & BLOCK_MASK;
if new_id == 0 {
let next = block.next.load(Ordering::Acquire);
self.head.0.store(next, Ordering::Release);
} else {
let new_head = BlockPtr::pack(block, new_id);
self.head.0.store(new_head, Ordering::Release);
}
} else {
end = block_start + new_id;
while end > self.tail.index.load(Ordering::Acquire) {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
let value = block.copy_to_bulk(pop_index, end);
if block.mark_slots_read(end - pop_index) {
let _unused_block = unsafe { Box::from_raw(block) };
}
return value;
}
Err(i) => {
head = i;
push_index = self.tail.index.load(Ordering::Acquire);
tail_block = self.tail.block.load(Ordering::Acquire);
}
}
}
}
pub unsafe fn len(&self) -> usize {
let head = self.head.0.load(Ordering::Acquire);
let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
let (block, id) = BlockPtr::unpack(head);
let block = unsafe { &mut *block };
let block_start = block.start.load(Ordering::Relaxed);
let pop_index = block_start + id;
let push_index = self.tail.index.load(Ordering::Acquire);
push_index.wrapping_sub(pop_index)
}
pub fn is_empty(&self) -> bool {
let head = self.head.0.load(Ordering::Acquire);
let head = (head as usize & !(1 << 63)) as *mut BlockNode<T>;
let (block, id) = BlockPtr::unpack(head);
let push_index = self.tail.index.load(Ordering::Acquire);
let tail_block = self.tail.block.load(Ordering::Acquire);
block == tail_block && id == (push_index & BLOCK_MASK)
}
}
impl<T> Default for Queue<T> {
fn default() -> Self {
Queue::new()
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while !self.bulk_pop().is_empty() {}
let head = self.head.0.load(Ordering::Acquire);
let (block, _id) = BlockPtr::unpack(head);
let tail = self.tail.block.load(Ordering::Acquire);
assert_eq!(block, tail);
let _unused_block = unsafe { Box::from_raw(block) };
}
}
pub fn local<T: 'static>() -> (Steal<T>, Local<T>) {
let inner = Arc::new(Queue::new());
let local = Local(inner.clone());
let remote = Steal(inner);
(remote, local)
}
pub struct Local<T: 'static>(Arc<Queue<T>>);
pub struct Steal<T: 'static>(Arc<Queue<T>>);
impl<T> Local<T> {
#[inline]
pub fn is_stealable(&self) -> bool {
!self.0.is_empty()
}
#[inline]
pub fn has_tasks(&self) -> bool {
!self.0.is_empty()
}
#[inline]
pub fn push_back(&mut self, task: T) {
self.0.push(task)
}
#[inline]
pub fn pop(&mut self) -> Option<T> {
self.0.local_pop()
}
}
impl<T> Steal<T> {
#[inline]
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}
#[inline]
pub fn steal_into(&self, dst: &mut Local<T>) -> Option<T> {
if std::ptr::eq(&self.0, &dst.0) {
return None;
}
let mut v = self.0.bulk_pop();
let ret = v.pop();
for t in v {
dst.push_back(t);
}
ret
}
}
impl<T> Clone for Steal<T> {
fn clone(&self) -> Steal<T> {
Steal(self.0.clone())
}
}
impl<T> Drop for Local<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
assert!(self.pop().is_none(), "queue not empty");
}
}
}
#[cfg(all(nightly, test))]
mod test {
extern crate test;
use self::test::Bencher;
use super::*;
use std::thread;
use crate::test_queue::ScBlockPop;
impl<T> ScBlockPop<T> for super::Queue<T> {
fn block_pop(&self) -> T {
let backoff = Backoff::new();
loop {
match self.pop() {
Some(v) => return v,
None => backoff.snooze(),
}
}
}
}
#[test]
fn queue_sanity() {
let q = Queue::<usize>::new();
assert!(q.is_empty());
for i in 0..100 {
q.push(i);
}
assert_eq!(unsafe { q.len() }, 100);
println!("{q:?}");
for i in 0..100 {
assert_eq!(q.pop(), Some(i));
}
assert_eq!(q.pop(), None);
assert!(q.is_empty());
}
#[bench]
fn single_thread_test(b: &mut Bencher) {
let q = Queue::new();
let mut i = 0;
b.iter(|| {
q.push(i);
assert_eq!(q.pop(), Some(i));
i += 1;
});
}
#[bench]
fn multi_1p1c_test(b: &mut Bencher) {
b.iter(|| {
let q = Arc::new(Queue::new());
let total_work: usize = 1_000_000;
let _q = q.clone();
thread::spawn(move || {
for i in 0..total_work {
_q.push(i);
}
});
for i in 0..total_work {
let v = q.block_pop();
assert_eq!(i, v);
}
});
}
#[bench]
fn multi_1p2c_test(b: &mut Bencher) {
b.iter(|| {
let q = Arc::new(Queue::new());
let total_work: usize = 1_000_000;
for i in 0..total_work {
q.push(i);
}
let sum = AtomicUsize::new(0);
let threads = 20;
thread::scope(|s| {
(0..threads).for_each(|_| {
s.spawn(|| {
let mut total = 0;
for _i in 0..total_work / threads {
total += q.block_pop();
}
sum.fetch_add(total, Ordering::Relaxed);
});
});
});
assert!(q.is_empty());
assert_eq!(sum.load(Ordering::Relaxed), (0..total_work).sum());
});
}
#[bench]
fn bulk_1p2c_test(b: &mut Bencher) {
b.iter(|| {
let q = Arc::new(Queue::new());
let total_work: usize = 1_000_000;
for i in 0..total_work {
q.push(i);
}
let total = Arc::new(AtomicUsize::new(0));
thread::scope(|s| {
let threads = 20;
for _ in 0..threads {
let q = q.clone();
let total = total.clone();
s.spawn(move || {
while !q.is_empty() {
let v = q.bulk_pop();
if !v.is_empty() {
total.fetch_add(v.len(), Ordering::AcqRel);
}
}
});
}
});
assert!(q.is_empty());
assert_eq!(total.load(Ordering::Acquire), total_work);
});
}
}