use alloc::boxed::Box;
use core::cell::UnsafeCell;
use core::fmt;
use core::mem::{self, MaybeUninit};
use core::panic::{RefUnwindSafe, UnwindSafe};
use core::sync::atomic::{self, AtomicUsize, Ordering};
use crossbeam_utils::{Backoff, CachePadded};
struct Slot<T> {
stamp: AtomicUsize,
value: UnsafeCell<MaybeUninit<T>>,
}
pub struct ArrayQueue<T> {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
buffer: Box<[Slot<T>]>,
cap: usize,
one_lap: usize,
}
unsafe impl<T: Send> Sync for ArrayQueue<T> {}
unsafe impl<T: Send> Send for ArrayQueue<T> {}
impl<T> UnwindSafe for ArrayQueue<T> {}
impl<T> RefUnwindSafe for ArrayQueue<T> {}
impl<T> ArrayQueue<T> {
pub fn new(cap: usize) -> ArrayQueue<T> {
assert!(cap > 0, "capacity must be non-zero");
let head = 0;
let tail = 0;
let buffer: Box<[Slot<T>]> = (0..cap)
.map(|i| {
Slot {
stamp: AtomicUsize::new(i),
value: UnsafeCell::new(MaybeUninit::uninit()),
}
})
.collect();
let one_lap = (cap + 1).next_power_of_two();
ArrayQueue {
buffer,
cap,
one_lap,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
}
}
fn push_or_else<F>(&self, mut value: T, f: F) -> Result<(), T>
where
F: Fn(T, usize, usize, &Slot<T>) -> Result<T, T>,
{
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);
let new_tail = if index + 1 < self.cap {
tail + 1
} else {
lap.wrapping_add(self.one_lap)
};
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
if tail == stamp {
match self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
unsafe {
slot.value.get().write(MaybeUninit::new(value));
}
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
Err(t) => {
tail = t;
backoff.spin();
}
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
value = f(value, tail, new_tail, slot)?;
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}
}
pub fn push(&self, value: T) -> Result<(), T> {
self.push_or_else(value, |v, tail, _, _| {
let head = self.head.load(Ordering::Relaxed);
if head.wrapping_add(self.one_lap) == tail {
Err(v)
} else {
Ok(v)
}
})
}
pub fn force_push(&self, value: T) -> Option<T> {
self.push_or_else(value, |v, tail, new_tail, slot| {
let head = tail.wrapping_sub(self.one_lap);
let new_head = new_tail.wrapping_sub(self.one_lap);
if self
.head
.compare_exchange_weak(head, new_head, Ordering::SeqCst, Ordering::Relaxed)
.is_ok()
{
self.tail.store(new_tail, Ordering::SeqCst);
let old = unsafe { slot.value.get().replace(MaybeUninit::new(v)).assume_init() };
slot.stamp.store(tail + 1, Ordering::Release);
Err(old)
} else {
Ok(v)
}
})
.err()
}
pub fn pop(&self) -> Option<T> {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
let index = head & (self.one_lap - 1);
let lap = head & !(self.one_lap - 1);
debug_assert!(index < self.buffer.len());
let slot = unsafe { self.buffer.get_unchecked(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
if head + 1 == stamp {
let new = if index + 1 < self.cap {
head + 1
} else {
lap.wrapping_add(self.one_lap)
};
match self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
let msg = unsafe { slot.value.get().read().assume_init() };
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Some(msg);
}
Err(h) => {
head = h;
backoff.spin();
}
}
} else if stamp == head {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.load(Ordering::Relaxed);
if tail == head {
return None;
}
backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}
pub fn capacity(&self) -> usize {
self.cap
}
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
tail == head
}
pub fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
head.wrapping_add(self.one_lap) == tail
}
pub fn len(&self) -> usize {
loop {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if tail == head {
0
} else {
self.cap
};
}
}
}
}
impl<T> Drop for ArrayQueue<T> {
fn drop(&mut self) {
if mem::needs_drop::<T>() {
let head = *self.head.get_mut();
let tail = *self.tail.get_mut();
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
let len = if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if tail == head {
0
} else {
self.cap
};
for i in 0..len {
let index = if hix + i < self.cap {
hix + i
} else {
hix + i - self.cap
};
unsafe {
debug_assert!(index < self.buffer.len());
let slot = self.buffer.get_unchecked_mut(index);
(*slot.value.get()).assume_init_drop();
}
}
}
}
}
impl<T> fmt::Debug for ArrayQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.pad("ArrayQueue { .. }")
}
}
impl<T> IntoIterator for ArrayQueue<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(self) -> Self::IntoIter {
IntoIter { value: self }
}
}
#[derive(Debug)]
pub struct IntoIter<T> {
value: ArrayQueue<T>,
}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
let value = &mut self.value;
let head = *value.head.get_mut();
if value.head.get_mut() != value.tail.get_mut() {
let index = head & (value.one_lap - 1);
let lap = head & !(value.one_lap - 1);
let val = unsafe {
debug_assert!(index < value.buffer.len());
let slot = value.buffer.get_unchecked_mut(index);
slot.value.get().read().assume_init()
};
let new = if index + 1 < value.cap {
head + 1
} else {
lap.wrapping_add(value.one_lap)
};
*value.head.get_mut() = new;
Option::Some(val)
} else {
Option::None
}
}
}