use core::cell::UnsafeCell;
use crate::flavor::Token;
use core::mem::{self, MaybeUninit};
use core::panic::{RefUnwindSafe, UnwindSafe};
use core::ptr;
use core::sync::atomic::{AtomicU64, Ordering};
use crossbeam_utils::CachePadded;
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
}
pub struct ArrayQueueSpsc<T> {
sender: CachePadded<AtomicU64>,
recv: CachePadded<AtomicU64>,
buffer: Box<[Slot<T>]>,
one_lap: u32,
}
unsafe impl<T> Sync for ArrayQueueSpsc<T> {}
unsafe impl<T> Send for ArrayQueueSpsc<T> {}
impl<T> UnwindSafe for ArrayQueueSpsc<T> {}
impl<T> RefUnwindSafe for ArrayQueueSpsc<T> {}
impl<T> ArrayQueueSpsc<T> {
pub fn new(cap: usize) -> Self {
assert!(cap > 0, "capacity must be non-zero");
assert!(cap < (1 << 31), "capacity too large for u32 logic");
let head = 0;
let tail = 0;
let buffer: Box<[Slot<T>]> =
(0..cap).map(|_i| Slot { value: UnsafeCell::new(MaybeUninit::uninit()) }).collect();
let one_lap = (cap + 1).next_power_of_two() as u32;
Self {
buffer,
one_lap,
recv: CachePadded::new(AtomicU64::new(((tail as u64) << 32) | (head as u64))),
sender: CachePadded::new(AtomicU64::new(((head as u64) << 32) | (tail as u64))),
}
}
#[inline(always)]
fn _try_push(&self, order: Ordering, value: *const T) -> bool {
let sender_val = self.sender.load(Ordering::Relaxed);
let tail = sender_val as u32;
let mut head_cached = (sender_val >> 32) as u32;
if head_cached.wrapping_add(self.one_lap) == tail {
let head = self.recv.load(order) as u32;
if head == head_cached {
return false;
}
head_cached = head;
}
let cap = self.capacity();
let index = (tail & (self.one_lap - 1)) as usize;
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let new_tail = if index + 1 < cap {
tail + 1
} else {
let lap = tail & !(self.one_lap - 1);
lap.wrapping_add(self.one_lap)
};
unsafe {
let item: &mut MaybeUninit<T> = &mut *slot.value.get();
item.write(ptr::read(value));
}
self.sender.store(((head_cached as u64) << 32) | (new_tail as u64), Ordering::SeqCst);
true
}
#[inline(always)]
pub unsafe fn push_with_ptr(&self, value: *const T) -> bool {
self._try_push(Ordering::Acquire, value)
}
#[inline(always)]
pub unsafe fn push_with_ptr_final(&self, value: *const T) -> bool {
self._try_push(Ordering::SeqCst, value)
}
#[inline]
pub fn start_read(&self, final_check: bool) -> Option<Token> {
if let Some((head, tail_cached)) = self._start_read::<true>(final_check) {
let (slot, packed_recv) = self._read(head, tail_cached);
Some(Token::new(slot as *const Slot<T> as *const u8, packed_recv as usize))
} else {
None
}
}
#[inline]
pub fn pop(&self, final_check: bool) -> Option<T> {
if let Some((head, tail_cached)) = self._start_read::<true>(final_check) {
let (slot, packed_recv) = self._read(head, tail_cached);
let msg = unsafe { slot.value.get().read().assume_init() };
self.recv.store(packed_recv, Ordering::SeqCst);
Some(msg)
} else {
None
}
}
#[inline]
pub fn pop_cached(&self) -> Option<T> {
if let Some((head, tail_cached)) = self._start_read::<false>(false) {
let (slot, packed_recv) = self._read(head, tail_cached);
let msg = unsafe { slot.value.get().read().assume_init() };
self.recv.store(packed_recv, Ordering::SeqCst);
Some(msg)
} else {
None
}
}
#[inline]
fn _start_read<const SPIN: bool>(&self, _final_check: bool) -> Option<(u32, u32)> {
let recv_val = self.recv.load(Ordering::Relaxed);
let head = recv_val as u32;
let mut tail_cached = (recv_val >> 32) as u32;
if tail_cached == head {
if SPIN {
std::hint::spin_loop();
let tail = {
if _final_check {
self.sender.load(Ordering::SeqCst) as u32
} else {
self.sender.load(Ordering::Acquire) as u32
}
};
if head == tail {
return None;
}
tail_cached = tail;
} else {
return None;
}
}
Some((head, tail_cached))
}
#[inline]
fn _read(&self, head: u32, tail_cached: u32) -> (&Slot<T>, u64) {
let index = (head & (self.one_lap - 1)) as usize;
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let new_head = if index + 1 < self.capacity() {
head + 1
} else {
let lap = head & !(self.one_lap - 1);
lap.wrapping_add(self.one_lap)
};
(slot, ((tail_cached as u64) << 32) | (new_head as u64))
}
#[inline(always)]
pub fn read(&self, token: Token) -> T {
let slot: &Slot<T> = unsafe { &*token.pos.cast::<Slot<T>>() };
let msg = unsafe { slot.value.get().read().assume_init() };
self.recv.store(token.stamp as u64, Ordering::SeqCst);
msg
}
#[inline]
pub fn capacity(&self) -> usize {
self.buffer.len()
}
#[inline(always)]
pub fn is_empty(&self) -> bool {
let head = self.recv.load(Ordering::SeqCst) as u32;
let tail = self.sender.load(Ordering::SeqCst) as u32;
tail == head
}
#[inline(always)]
pub fn is_full(&self) -> bool {
let tail = self.sender.load(Ordering::SeqCst) as u32;
let head = self.recv.load(Ordering::SeqCst) as u32;
head.wrapping_add(self.one_lap) == tail
}
#[inline]
pub fn len(&self) -> usize {
loop {
let tail = self.sender.load(Ordering::SeqCst) as u32;
let head = self.recv.load(Ordering::SeqCst) as u32;
if self.sender.load(Ordering::SeqCst) as u32 == tail {
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
return if hix < tix {
(tix - hix) as usize
} else if hix > tix {
self.capacity() - (hix - tix) as usize
} else if tail == head {
0
} else {
self.capacity()
};
}
}
}
}
impl<T> Drop for ArrayQueueSpsc<T> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
let head = (*self.recv.get_mut()) as u32;
let tail = (*self.sender.get_mut()) as u32;
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
let len = if hix < tix {
tix - hix
} else if hix > tix {
self.capacity() as u32 - hix + tix
} else if tail == head {
0
} else {
self.capacity() as u32
};
for i in 0..(len as usize) {
let index = if (hix as usize) + i < self.capacity() {
(hix as usize) + i
} else {
(hix as usize) + i - self.capacity()
};
unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
(*slot.value.get()).assume_init_drop();
}
}
}
}
}