use array_macro::array;
use peril::{HazardPointer, HazardRecord, HazardRegistry, HazardValue, Ordering};
use std::sync::atomic::{AtomicPtr, AtomicU32};
#[cfg(test)]
mod tests;
const BUFFER_SIZE: u32 = 1024;
const TAKEN: *mut () = 1 as *mut ();
struct Node<T> {
deqidx: AtomicU32,
items: [AtomicPtr<T>; BUFFER_SIZE as usize],
enqidx: AtomicU32,
next: HazardPointer<Node<T>>,
}
impl<T: Send> Node<T> {
fn new(item: *mut T) -> Self {
let items: [AtomicPtr<T>; BUFFER_SIZE as usize] =
array![_ => Default::default(); BUFFER_SIZE as usize];
items[0].store(item, Ordering::Relaxed);
Node {
deqidx: AtomicU32::new(0),
items,
enqidx: AtomicU32::new(1),
next: HazardPointer::new(HazardValue::dummy(0)),
}
}
fn cas_next<'registry>(
&self,
registry: &'registry HazardRegistry<Node<T>>,
cmp: HazardValue<Node<T>>,
val: HazardValue<Node<T>>,
) -> bool {
self.next
.compare_exchange(registry, cmp, val, Ordering::AcqRel, Ordering::Relaxed)
.is_ok()
}
}
#[repr(align(128))]
struct AlignedHazardPtr<T: Send>(HazardPointer<T>);
pub struct FaaArrayQueue<T: Send> {
registry: HazardRegistry<Node<T>>,
head: AlignedHazardPtr<Node<T>>,
tail: AlignedHazardPtr<Node<T>>,
}
impl<T: Send> Drop for FaaArrayQueue<T> {
fn drop(&mut self) {
loop {
if self.dequeue().is_none() {
break;
}
}
}
}
impl<T: Send> Default for FaaArrayQueue<T> {
fn default() -> Self {
let registry = HazardRegistry::default();
let sentinel = HazardValue::boxed(Node::new(std::ptr::null_mut()));
let head = AlignedHazardPtr(HazardPointer::new(sentinel.clone()));
let tail = AlignedHazardPtr(HazardPointer::new(sentinel));
FaaArrayQueue {
registry,
head,
tail,
}
}
}
impl<T: Send> FaaArrayQueue<T> {
pub fn enqueue(&self, item: T) {
let item = Box::new(item);
let item = Box::into_raw(item);
let mut record = HazardRecord::default();
loop {
let scope = self.tail.0.protect(&self.registry, &mut record);
let ltail = scope.as_ref().unwrap();
let idx = ltail.enqidx.fetch_add(1, Ordering::AcqRel);
if idx > (BUFFER_SIZE - 1) {
if scope.changed(Ordering::Acquire) {
drop(scope);
continue;
}
let lnext = {
let mut record2 = HazardRecord::default();
let scope2 = ltail.next.protect(&self.registry, &mut record2);
scope2.clone_value()
};
if lnext.is_dummy() {
let new_node = HazardValue::boxed(Node::new(item));
let cloned_node = new_node.clone();
if ltail.cas_next(&self.registry, HazardValue::dummy(0), new_node) {
let _ = scope.compare_exchange(
cloned_node,
Ordering::AcqRel,
Ordering::Relaxed,
);
return;
}
} else {
let _ = scope.compare_exchange(lnext, Ordering::AcqRel, Ordering::Relaxed);
}
continue;
}
if ltail.items[idx as usize]
.compare_exchange(
std::ptr::null_mut(),
item,
Ordering::AcqRel,
Ordering::Relaxed,
)
.is_ok()
{
return;
}
}
}
pub fn dequeue(&self) -> Option<T> {
let mut record = HazardRecord::default();
loop {
let scope = self.head.0.protect(&self.registry, &mut record);
let lhead = scope.as_ref().unwrap();
if lhead.deqidx.load(Ordering::Acquire) >= lhead.enqidx.load(Ordering::Acquire)
&& lhead.next.get_dummy(Ordering::Acquire).is_none()
{
break;
}
let idx = lhead.deqidx.fetch_add(1, Ordering::AcqRel);
if idx > (BUFFER_SIZE - 1) {
let lnext = {
let mut record2 = HazardRecord::default();
let scope2 = lhead.next.protect(&self.registry, &mut record2);
scope2.clone_value()
};
if lnext.is_dummy() {
break; }
let _ = scope.compare_exchange(lnext, Ordering::AcqRel, Ordering::Relaxed);
continue;
}
let item = lhead.items[idx as usize].swap(TAKEN as *mut T, Ordering::AcqRel);
if item == std::ptr::null_mut() {
continue;
}
let item = unsafe { Box::from_raw(item) };
return Some(*item);
}
None
}
}