use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::atomic::{AtomicUsize, Ordering};
use crate::utils::CacheAligned;
struct Slot<T> {
stamp: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
pub struct ArrayQueue<T> {
head: CacheAligned<AtomicUsize>,
tail: CacheAligned<AtomicUsize>,
buffer: Box<[Slot<T>]>,
mask: usize,
}
unsafe impl<T: Send> Send for ArrayQueue<T> {}
unsafe impl<T: Send> Sync for ArrayQueue<T> {}
impl<T> ArrayQueue<T> {
pub fn new(cap: usize) -> ArrayQueue<T> {
let capacity = if cap < 1 { 1 } else { cap.next_power_of_two() };
let mut buffer = Vec::with_capacity(capacity);
for i in 0..capacity {
buffer.push(Slot {
stamp: AtomicUsize::new(i),
value: UnsafeCell::new(MaybeUninit::uninit()),
});
}
ArrayQueue {
buffer: buffer.into_boxed_slice(),
mask: capacity - 1,
head: CacheAligned::new(AtomicUsize::new(0)),
tail: CacheAligned::new(AtomicUsize::new(0)),
}
}
pub fn push(&self, value: T) -> Result<(), T> {
let backoff = crossbeam_utils::Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
let index = tail & self.mask;
let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire);
if tail == stamp {
let next = tail + 1;
if self
.tail
.compare_exchange_weak(tail, next, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
} else if tail + 1 > stamp {
let head = self.head.load(Ordering::Relaxed);
if tail >= head + self.buffer.len() {
return Err(value);
}
backoff.snooze();
} else {
backoff.snooze();
}
tail = self.tail.load(Ordering::Relaxed);
}
}
pub fn pop(&self) -> Option<T> {
let backoff = crossbeam_utils::Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
let index = head & self.mask;
let slot = &self.buffer[index];
let stamp = slot.stamp.load(Ordering::Acquire);
if head + 1 == stamp {
let next = head + 1;
if self
.head
.compare_exchange_weak(head, next, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
let value = unsafe { slot.value.get().read().assume_init() };
slot.stamp
.store(head + self.buffer.len(), Ordering::Release);
return Some(value);
}
} else if head == stamp {
let tail = self.tail.load(Ordering::Relaxed);
if tail == head {
return None;
}
backoff.snooze();
} else {
backoff.snooze();
}
head = self.head.load(Ordering::Relaxed);
}
}
pub fn capacity(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
head == tail
}
pub fn is_full(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
tail == head + self.buffer.len()
}
}
impl<T> Drop for ArrayQueue<T> {
fn drop(&mut self) {
while self.pop().is_some() {}
}
}