use core::{cell::UnsafeCell, mem::MaybeUninit};
#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;
use atomic::Ordering;
#[cfg(feature = "mpmc_large")]
type AtomicTargetSize = atomic::AtomicUsize;
#[cfg(not(feature = "mpmc_large"))]
type AtomicTargetSize = atomic::AtomicU8;
#[cfg(feature = "mpmc_large")]
type IntSize = usize;
#[cfg(not(feature = "mpmc_large"))]
type IntSize = u8;
pub type Q2<T> = MpMcQueue<T, 2>;
pub type Q4<T> = MpMcQueue<T, 4>;
pub type Q8<T> = MpMcQueue<T, 8>;
pub type Q16<T> = MpMcQueue<T, 16>;
pub type Q32<T> = MpMcQueue<T, 32>;
pub type Q64<T> = MpMcQueue<T, 64>;
pub struct MpMcQueue<T, const N: usize> {
buffer: UnsafeCell<[Cell<T>; N]>,
dequeue_pos: AtomicTargetSize,
enqueue_pos: AtomicTargetSize,
}
impl<T, const N: usize> MpMcQueue<T, N> {
const MASK: IntSize = (N - 1) as IntSize;
const EMPTY_CELL: Cell<T> = Cell::new(0);
const ASSERT: [(); 1] = [()];
pub const fn new() -> Self {
crate::sealed::greater_than_1::<N>();
crate::sealed::power_of_two::<N>();
Self::ASSERT[!(N < (IntSize::MAX as usize)) as usize];
let mut cell_count = 0;
let mut result_cells: [Cell<T>; N] = [Self::EMPTY_CELL; N];
while cell_count != N {
result_cells[cell_count] = Cell::new(cell_count);
cell_count += 1;
}
Self {
buffer: UnsafeCell::new(result_cells),
dequeue_pos: AtomicTargetSize::new(0),
enqueue_pos: AtomicTargetSize::new(0),
}
}
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(self.buffer.get() as *mut _, &self.dequeue_pos, Self::MASK) }
}
pub fn enqueue(&self, item: T) -> Result<(), T> {
unsafe {
enqueue(
self.buffer.get() as *mut _,
&self.enqueue_pos,
Self::MASK,
item,
)
}
}
}
impl<T, const N: usize> Default for MpMcQueue<T, N> {
fn default() -> Self {
Self::new()
}
}
unsafe impl<T, const N: usize> Sync for MpMcQueue<T, N> where T: Send {}
struct Cell<T> {
data: MaybeUninit<T>,
sequence: AtomicTargetSize,
}
impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as IntSize),
}
}
}
unsafe fn dequeue<T>(
buffer: *mut Cell<T>,
dequeue_pos: &AtomicTargetSize,
mask: IntSize,
) -> Option<T> {
let mut pos = dequeue_pos.load(Ordering::Relaxed);
let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as i8).wrapping_sub((pos.wrapping_add(1)) as i8);
if dif == 0 {
if dequeue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
} else if dif < 0 {
return None;
} else {
pos = dequeue_pos.load(Ordering::Relaxed);
}
}
let data = (*cell).data.as_ptr().read();
(*cell)
.sequence
.store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
Some(data)
}
unsafe fn enqueue<T>(
buffer: *mut Cell<T>,
enqueue_pos: &AtomicTargetSize,
mask: IntSize,
item: T,
) -> Result<(), T> {
let mut pos = enqueue_pos.load(Ordering::Relaxed);
let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as i8).wrapping_sub(pos as i8);
if dif == 0 {
if enqueue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
} else if dif < 0 {
return Err(item);
} else {
pos = enqueue_pos.load(Ordering::Relaxed);
}
}
(*cell).data.as_mut_ptr().write(item);
(*cell)
.sequence
.store(pos.wrapping_add(1), Ordering::Release);
Ok(())
}
#[cfg(test)]
mod tests {
use super::Q2;
#[test]
fn sanity() {
let q = Q2::new();
q.enqueue(0).unwrap();
q.enqueue(1).unwrap();
assert!(q.enqueue(2).is_err());
assert_eq!(q.dequeue(), Some(0));
assert_eq!(q.dequeue(), Some(1));
assert_eq!(q.dequeue(), None);
}
#[test]
fn drain_at_pos255() {
let q = Q2::new();
for _ in 0..255 {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
assert_eq!(q.dequeue(), None);
}
#[test]
fn full_at_wrapped_pos0() {
let q = Q2::new();
for _ in 0..254 {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
assert!(q.enqueue(0).is_ok());
assert!(q.enqueue(0).is_ok());
assert!(q.enqueue(0).is_err());
}
}