use crossbeam_utils::CachePadded;
use smallvec::SmallVec;
use crate::atomic::{AtomicPtr, AtomicUsize};
use std::cell::UnsafeCell;
use std::cmp;
use std::marker::PhantomData;
use std::mem::MaybeUninit;
use std::ptr;
use std::sync::atomic::Ordering;
pub const BLOCK_SIZE: usize = 1 << BLOCK_SHIFT;
pub const BLOCK_MASK: usize = BLOCK_SIZE - 1;
pub const BLOCK_SHIFT: usize = 5;
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
}
impl<T> Slot<T> {
const UNINIT: Self = Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
};
}
struct BlockNode<T> {
data: [Slot<T>; BLOCK_SIZE],
next: AtomicPtr<BlockNode<T>>,
}
impl<T> BlockNode<T> {
#[inline]
pub fn new() -> *mut BlockNode<T> {
Box::into_raw(Box::new(BlockNode {
next: AtomicPtr::new(ptr::null_mut()),
data: [Slot::UNINIT; BLOCK_SIZE],
}))
}
#[inline]
pub fn set(&self, index: usize, v: T) {
unsafe {
let data = self.data.get_unchecked(index & BLOCK_MASK);
data.value.get().write(MaybeUninit::new(v));
}
}
#[inline]
pub unsafe fn peek(&self, index: usize) -> &T {
let data = self.data.get_unchecked(index & BLOCK_MASK);
(*data.value.get()).assume_init_ref()
}
#[inline]
pub fn get(&self, id: usize) -> T {
unsafe {
let data = self.data.get_unchecked(id);
data.value.get().read().assume_init()
}
}
#[inline]
pub fn copy_to_bulk(&self, start: usize, end: usize) -> SmallVec<[T; BLOCK_SIZE]> {
let len = end - start;
let start = start & BLOCK_MASK;
(start..start + len).map(|id| self.get(id)).collect()
}
}
#[inline]
pub fn bulk_end(start: usize, end: usize) -> usize {
let block_end = (start + BLOCK_SIZE) & !BLOCK_MASK;
cmp::min(end, block_end)
}
#[derive(Debug)]
struct Position<T> {
index: AtomicUsize,
block: AtomicPtr<BlockNode<T>>,
}
impl<T> Position<T> {
fn new(block: *mut BlockNode<T>) -> Self {
Position {
index: AtomicUsize::new(0),
block: AtomicPtr::new(block),
}
}
}
#[derive(Debug)]
pub struct Queue<T> {
tail: CachePadded<Position<T>>,
head: Position<T>,
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Send for Queue<T> {}
unsafe impl<T: Send> Sync for Queue<T> {}
impl<T> Queue<T> {
pub fn new() -> Self {
let init_block = BlockNode::<T>::new();
Queue {
head: Position::new(init_block),
tail: Position::new(init_block).into(),
_marker: PhantomData,
}
}
pub fn push(&self, v: T) {
let tail = unsafe { &mut *self.tail.block.unsync_load() };
let push_index = unsafe { self.tail.index.unsync_load() };
tail.set(push_index, v);
std::sync::atomic::fence(Ordering::Release);
let new_index = push_index.wrapping_add(1);
if new_index & BLOCK_MASK == 0 {
let new_tail = BlockNode::new();
tail.next.store(new_tail, Ordering::Relaxed);
self.tail.block.store(new_tail, Ordering::Relaxed);
}
self.tail.index.store(new_index, Ordering::Release);
}
pub unsafe fn peek(&self) -> Option<&T> {
let index = self.head.index.unsync_load();
let push_index = self.tail.index.load(Ordering::Acquire);
if index == push_index {
return None;
}
let head = &mut *self.head.block.unsync_load();
Some(head.peek(index))
}
pub fn pop(&self) -> Option<T> {
let index = unsafe { self.head.index.unsync_load() };
let push_index = self.tail.index.load(Ordering::Acquire);
if index == push_index {
return None;
}
let head = unsafe { &mut *self.head.block.unsync_load() };
let v = head.get(index & BLOCK_MASK);
let new_index = index.wrapping_add(1);
if new_index & BLOCK_MASK == 0 {
let new_head = head.next.load(Ordering::Relaxed);
let _unused_head = unsafe { Box::from_raw(head) };
self.head.block.store(new_head, Ordering::Relaxed);
}
self.head.index.store(new_index, Ordering::Relaxed);
Some(v)
}
#[inline]
pub fn len(&self) -> usize {
let pop_index = self.head.index.load(Ordering::Relaxed);
let push_index = self.tail.index.load(Ordering::Acquire);
push_index.wrapping_sub(pop_index)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn bulk_pop(&self) -> SmallVec<[T; BLOCK_SIZE]> {
let index = unsafe { self.head.index.unsync_load() };
let push_index = self.tail.index.load(Ordering::Acquire);
if index == push_index {
return SmallVec::new();
}
let head = unsafe { &mut *self.head.block.unsync_load() };
let end = bulk_end(index, push_index);
let value = head.copy_to_bulk(index, end);
let new_index = end;
if new_index & BLOCK_MASK == 0 {
let new_head = head.next.load(Ordering::Relaxed);
let _unused_head = unsafe { Box::from_raw(head) };
self.head.block.store(new_head, Ordering::Relaxed);
}
self.head.index.store(new_index, Ordering::Relaxed);
value
}
}
impl<T> Default for Queue<T> {
fn default() -> Self {
Queue::new()
}
}
impl<T> Drop for Queue<T> {
fn drop(&mut self) {
while !self.bulk_pop().is_empty() {}
let head = self.head.block.load(Ordering::Relaxed);
let tail = self.tail.block.load(Ordering::Relaxed);
assert_eq!(head, tail);
unsafe {
let _unused_block = Box::from_raw(head);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn queue_sanity() {
let q = Queue::<usize>::new();
assert_eq!(q.len(), 0);
for i in 0..100 {
q.push(i);
}
assert_eq!(q.len(), 100);
println!("{q:?}");
for i in 0..100 {
assert_eq!(q.pop(), Some(i));
}
assert_eq!(q.pop(), None);
assert_eq!(q.len(), 0);
}
#[test]
fn bulk_pop_test() {
let q = Queue::<usize>::new();
let total_size = BLOCK_SIZE + 17;
for i in 0..total_size {
q.push(i);
}
let vec = q.bulk_pop();
assert_eq!(vec.len(), BLOCK_SIZE);
assert_eq!(q.len(), total_size - BLOCK_SIZE);
let v = q.bulk_pop();
assert_eq!(v[0], BLOCK_SIZE);
assert_eq!(v.len(), 17);
assert_eq!(q.len(), 0);
println!("{q:?}");
for (i, item) in vec.iter().enumerate() {
assert_eq!(*item, i);
}
}
}
#[cfg(all(nightly, test))]
mod bench {
extern crate test;
use self::test::Bencher;
use super::*;
use std::sync::Arc;
use std::thread;
use crate::test_queue::ScBlockPop;
impl<T> ScBlockPop<T> for super::Queue<T> {
fn block_pop(&self) -> T {
let backoff = crossbeam_utils::Backoff::new();
loop {
match self.pop() {
Some(v) => return v,
None => backoff.snooze(),
}
}
}
}
#[test]
fn spsc_peek() {
let q = Queue::new();
assert_eq!(unsafe { q.peek() }, None);
q.push(1);
assert_eq!(unsafe { q.peek() }, Some(&1));
let v = q.pop();
assert_eq!(v, Some(1));
assert_eq!(unsafe { q.peek() }, None);
}
#[bench]
fn bulk_pop_1p1c_bench(b: &mut Bencher) {
b.iter(|| {
let q = Arc::new(Queue::new());
let total_work: usize = 1_000_000;
let _q = q.clone();
thread::spawn(move || {
for i in 0..total_work {
_q.push(i);
}
});
let mut size = 0;
while size < total_work {
let v = q.bulk_pop();
for (start, i) in v.iter().enumerate() {
assert_eq!(*i, start + size);
}
size += v.len();
}
});
}
#[bench]
fn single_thread_test(b: &mut Bencher) {
let q = Queue::new();
let mut i = 0;
b.iter(|| {
q.push(i);
assert_eq!(q.pop(), Some(i));
i += 1;
});
}
#[bench]
fn multi_1p1c_test(b: &mut Bencher) {
b.iter(|| {
let q = Arc::new(Queue::new());
let total_work: usize = 1_000_000;
let _q = q.clone();
thread::spawn(move || {
for i in 0..total_work {
_q.push(i);
}
});
for i in 0..total_work {
let v = q.block_pop();
assert_eq!(i, v);
}
});
}
}