use core::cell::UnsafeCell;
use crate::flavor::Token;
use core::mem::{self, MaybeUninit};
use core::panic::{RefUnwindSafe, UnwindSafe};
use core::ptr;
use core::sync::atomic::{self, AtomicUsize, Ordering};
use crossbeam_utils::{Backoff, CachePadded};
struct Slot<T> {
stamp: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
pub struct ArrayQueue<T, const MP: bool, const MC: bool> {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
buffer: Box<[Slot<T>]>,
one_lap: usize,
}
unsafe impl<T, const MP: bool, const MC: bool> Sync for ArrayQueue<T, MP, MC> {}
unsafe impl<T, const MP: bool, const MC: bool> Send for ArrayQueue<T, MP, MC> {}
impl<T, const MP: bool, const MC: bool> UnwindSafe for ArrayQueue<T, MP, MC> {}
impl<T, const MP: bool, const MC: bool> RefUnwindSafe for ArrayQueue<T, MP, MC> {}
impl<T, const MP: bool, const MC: bool> ArrayQueue<T, MP, MC> {
pub fn new(cap: usize) -> Self {
assert!(cap > 0, "capacity must be non-zero");
let head = 0;
let tail = 0;
let buffer: Box<[Slot<T>]> = (0..cap)
.map(|i| {
Slot { stamp: AtomicUsize::new(i), value: UnsafeCell::new(MaybeUninit::uninit()) }
})
.collect();
let one_lap = (cap + 1).next_power_of_two();
Self {
buffer,
one_lap,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
}
}
#[allow(dead_code)]
#[inline(always)]
pub unsafe fn try_push_oneshot(&self, value: *const T) -> Option<bool> {
let tail = self.tail.load(Ordering::SeqCst);
macro_rules! check_full {
($tail: expr) => {
let head = self.head.load(Ordering::SeqCst);
if head.wrapping_add(self.one_lap) == $tail {
return Some(false);
}
};
}
check_full!(tail);
match self._try_push(tail, value) {
Ok(_) => Some(true),
Err((_stamp, _new_tail)) => {
None
}
}
}
#[inline]
fn _try_push(&self, tail: usize, value: *const T) -> Result<bool, (usize, Option<usize>)> {
let cap = self.capacity();
let index = tail & (self.one_lap - 1);
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
if tail == stamp {
let new_tail = if index + 1 < cap {
tail + 1
} else {
let lap = tail & !(self.one_lap - 1);
lap.wrapping_add(self.one_lap)
};
if MP {
if let Err(t) = self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
return Err((stamp, Some(t)));
}
} else {
self.tail.store(new_tail, Ordering::SeqCst);
}
unsafe {
let item: &mut MaybeUninit<T> = &mut *slot.value.get();
item.write(ptr::read(value));
}
slot.stamp.store(tail + 1, Ordering::Release);
Ok(true)
} else {
Err((stamp, None))
}
}
#[inline(always)]
pub unsafe fn push_with_ptr(&self, value: *const T) -> bool {
let backoff = Backoff::new();
let mut tail =
if MP { self.tail.load(Ordering::Relaxed) } else { self.tail.load(Ordering::Acquire) };
macro_rules! check_full {
($tail: expr) => {
let head = if MP || MC {
atomic::fence(Ordering::SeqCst);
self.head.load(Ordering::Relaxed)
} else {
self.head.load(Ordering::SeqCst)
};
if head.wrapping_add(self.one_lap) == $tail {
return false;
}
};
}
loop {
match self._try_push(tail, value) {
Ok(res) => return res,
Err((stamp, new_tail)) => {
if let Some(_tail) = new_tail {
tail = _tail;
backoff.spin();
continue;
}
if stamp.wrapping_add(self.one_lap) == tail + 1 {
check_full!(tail);
}
backoff.snooze();
if MP {
tail = self.tail.load(Ordering::Relaxed);
}
}
}
}
}
#[inline]
pub fn start_read(&self, final_check: bool) -> Option<Token> {
if let Some((slot, stamp)) = self._start_read(final_check) {
Some(Token::new(slot as *const Slot<T> as *const u8, stamp))
} else {
None
}
}
#[inline]
pub fn pop(&self, final_check: bool) -> Option<T> {
if let Some((slot, stamp)) = self._start_read(final_check) {
let msg = unsafe { slot.value.get().read().assume_init() };
slot.stamp.store(stamp, Ordering::Release);
Some(msg)
} else {
None
}
}
#[inline]
fn _start_read(&self, final_check: bool) -> Option<(&Slot<T>, usize)> {
let mut head;
if final_check {
head = self.head.load(Ordering::SeqCst);
} else {
let order = if MC { Ordering::Relaxed } else { Ordering::Acquire };
head = self.head.load(order);
}
let backoff = Backoff::new();
loop {
let index = head & (self.one_lap - 1);
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
if head + 1 == stamp {
let new = if index + 1 < self.capacity() {
head + 1
} else {
let lap = head & !(self.one_lap - 1);
lap.wrapping_add(self.one_lap)
};
if MC {
if let Err(new_head) = self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
head = new_head;
backoff.spin();
continue;
}
} else {
self.head.store(new, Ordering::SeqCst);
}
let new_head = head.wrapping_add(self.one_lap);
return Some((slot, new_head));
} else {
if stamp == head {
let tail = if MP || MC {
atomic::fence(Ordering::SeqCst);
self.tail.load(Ordering::Relaxed)
} else {
self.tail.load(Ordering::SeqCst)
};
if tail == head {
return None;
}
backoff.spin();
} else {
backoff.snooze();
}
if MC {
head = self.head.load(Ordering::Relaxed);
}
continue;
}
}
}
#[inline(always)]
pub fn read(&self, token: Token) -> T {
let slot: &Slot<T> = unsafe { &*token.pos.cast::<Slot<T>>() };
let msg = unsafe { slot.value.get().read().assume_init() };
slot.stamp.store(token.stamp, Ordering::Release);
msg
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.len()
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
tail == head
}
#[inline(always)]
pub fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
head.wrapping_add(self.one_lap) == tail
}
#[inline]
pub fn len(&self) -> usize {
loop {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.capacity() - hix + tix
} else if tail == head {
0
} else {
self.capacity()
};
}
}
}
}
impl<T, const MP: bool, const MC: bool> Drop for ArrayQueue<T, MP, MC> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
let len = if hix < tix {
tix - hix
} else if hix > tix {
self.capacity() - hix + tix
} else if tail == head {
0
} else {
self.capacity()
};
for i in 0..len {
let index =
if hix + i < self.capacity() { hix + i } else { hix + i - self.capacity() };
unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
(*slot.value.get()).assume_init_drop();
}
}
}
}
}