use crate::cache_pad::CachePad;
use crate::node::{Node, NODE_CAPACITY, NODE_SIZE};
use crate::slot::{DRAINING, FILLED, READING};
use crate::variant::sync::atomic::{fence, AtomicPtr, AtomicUsize, Ordering};
use crate::variant::sync::Arc;
use crate::variant::thread;
use std::mem::MaybeUninit;
#[derive(Clone, Debug)]
pub struct Queue<T> {
inner: Arc<Inner<T>>,
}
impl<T> Queue<T> {
pub fn new() -> Self {
Self {
inner: Arc::new(Inner::new()),
}
}
pub fn push(&self, item: T) {
self.inner.push(item)
}
pub fn pop(&self) -> Option<T> {
self.inner.pop()
}
}
impl<T> Default for Queue<T> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
struct Inner<T> {
head: CachePad<Cursor<T>>,
tail: CachePad<Cursor<T>>,
}
impl<T> Inner<T> {
fn new() -> Self {
#[cfg(not(loom))]
let node: Node<T> = Node::UNINIT;
#[cfg(loom)]
let node: Node<T> = Node::new();
let first_node: *mut CachePad<Node<T>> = Box::into_raw(Box::new(CachePad::new(node)));
Self {
head: CachePad::new(Cursor {
index: AtomicUsize::new(0),
node: AtomicPtr::new(first_node),
}),
tail: CachePad::new(Cursor {
index: AtomicUsize::new(0),
node: AtomicPtr::new(first_node),
}),
}
}
fn push(&self, item: T) {
let mut tail_index = self.tail.index.load(Ordering::Acquire);
let mut tail_node = self.tail.node.load(Ordering::Acquire);
loop {
let offset = (tail_index >> MARK_BIT_SHIFT) % NODE_SIZE;
if offset == NODE_CAPACITY {
thread::yield_now();
tail_index = self.tail.index.load(Ordering::Acquire);
tail_node = self.tail.node.load(Ordering::Acquire);
continue;
}
let next_tail_index = tail_index + (1 << MARK_BIT_SHIFT);
match self.tail.index.compare_exchange_weak(
tail_index,
next_tail_index,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
if offset + 1 == NODE_CAPACITY {
#[cfg(not(loom))]
let node: Node<T> = Node::UNINIT;
#[cfg(loom)]
let node: Node<T> = Node::new();
let next_node = Box::into_raw(Box::new(CachePad::new(node)));
self.tail.node.store(next_node, Ordering::Release);
let _ = self
.tail
.index
.fetch_add(1 << MARK_BIT_SHIFT, Ordering::Release);
(*tail_node).next.store(next_node, Ordering::Release);
}
let slot = (*tail_node).container.get_unchecked(offset);
slot.item.with_mut(|p| p.write(MaybeUninit::new(item)));
let _ = slot.state.fetch_or(FILLED, Ordering::Release);
return;
},
Err(current_tail_index) => {
tail_index = current_tail_index;
tail_node = self.tail.node.load(Ordering::Acquire);
}
}
}
}
fn pop(&self) -> Option<T> {
let mut head_index = self.head.index.load(Ordering::Acquire);
let mut head_node = self.head.node.load(Ordering::Acquire);
loop {
let offset = (head_index >> MARK_BIT_SHIFT) % NODE_SIZE;
if offset == NODE_CAPACITY {
thread::yield_now();
head_index = self.head.index.load(Ordering::Acquire);
head_node = self.head.node.load(Ordering::Acquire);
continue;
}
let mut next_head_index = head_index + (1 << MARK_BIT_SHIFT);
if next_head_index & MARK_BIT == 0 {
fence(Ordering::SeqCst);
let tail_index = self.tail.index.load(Ordering::Acquire);
if head_index >> MARK_BIT_SHIFT == tail_index >> MARK_BIT_SHIFT {
return None;
}
if (head_index >> MARK_BIT_SHIFT) / NODE_SIZE
!= (tail_index >> MARK_BIT_SHIFT) / NODE_SIZE
{
next_head_index |= MARK_BIT;
}
}
match self.head.index.compare_exchange_weak(
head_index,
next_head_index,
Ordering::SeqCst,
Ordering::Acquire,
) {
Ok(_) => unsafe {
if offset + 1 == NODE_CAPACITY {
let next_node = (*head_node).wait_next();
let mut next_index =
(next_head_index & !MARK_BIT).wrapping_add(1 << MARK_BIT_SHIFT);
if !(*next_node).next.load(Ordering::Relaxed).is_null() {
next_index |= MARK_BIT;
}
self.head.node.store(next_node, Ordering::Release);
self.head.index.store(next_index, Ordering::Release);
}
let slot = (*head_node).container.get_unchecked(offset);
slot.wait_filled();
let item = slot.item.with(|p| p.read().assume_init());
if offset + 1 == NODE_CAPACITY {
Node::drain(head_node, 0);
} else if slot.state.fetch_or(READING, Ordering::AcqRel) & DRAINING != 0 {
Node::drain(head_node, offset + 1);
}
return Some(item);
},
Err(current_head_index) => {
head_index = current_head_index;
head_node = self.head.node.load(Ordering::Acquire);
}
}
}
}
}
#[derive(Debug)]
struct Cursor<T> {
index: AtomicUsize,
node: AtomicPtr<CachePad<Node<T>>>,
}
const MARK_BIT_SHIFT: usize = 1;
const MARK_BIT: usize = 1;