use core::{cell::UnsafeCell, mem::MaybeUninit};
#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;
use atomic::Ordering;
use crate::storage::{OwnedStorage, Storage, ViewStorage};
#[cfg(feature = "mpmc_large")]
type AtomicTargetSize = atomic::AtomicUsize;
#[cfg(not(feature = "mpmc_large"))]
type AtomicTargetSize = atomic::AtomicU8;
#[cfg(feature = "mpmc_large")]
type UintSize = usize;
#[cfg(not(feature = "mpmc_large"))]
type UintSize = u8;
#[cfg(feature = "mpmc_large")]
type IntSize = isize;
#[cfg(not(feature = "mpmc_large"))]
type IntSize = i8;
pub struct QueueInner<T, S: Storage> {
dequeue_pos: AtomicTargetSize,
enqueue_pos: AtomicTargetSize,
buffer: UnsafeCell<S::Buffer<Cell<T>>>,
}
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
pub type QueueView<T> = QueueInner<T, ViewStorage>;
impl<T, const N: usize> Queue<T, N> {
#[deprecated(
note = "See the documentation of Queue::new() for more information: https://docs.rs/heapless/latest/heapless/mpmc/type.Queue.html#method.new"
)]
pub const fn new() -> Self {
const {
assert!(N > 1);
assert!(N.is_power_of_two());
assert!(N < UintSize::MAX as usize);
}
let mut cell_count = 0;
let mut result_cells: [Cell<T>; N] = [const { Cell::new(0) }; N];
while cell_count != N {
result_cells[cell_count] = Cell::new(cell_count);
cell_count += 1;
}
Self {
buffer: UnsafeCell::new(result_cells),
dequeue_pos: AtomicTargetSize::new(0),
enqueue_pos: AtomicTargetSize::new(0),
}
}
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
self
}
pub(crate) fn as_view_mut_private(&mut self) -> &mut QueueView<T> {
self
}
}
impl<T, S: Storage> QueueInner<T, S> {
#[inline]
pub fn capacity(&self) -> usize {
S::len(self.buffer.get())
}
#[inline]
pub fn as_view(&self) -> &QueueView<T> {
S::as_mpmc_view(self)
}
#[inline]
pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
S::as_mpmc_mut_view(self)
}
fn mask(&self) -> UintSize {
(S::len(self.buffer.get()) - 1) as _
}
pub fn dequeue(&self) -> Option<T> {
unsafe { dequeue(S::as_ptr(self.buffer.get()), &self.dequeue_pos, self.mask()) }
}
pub fn enqueue(&self, item: T) -> Result<(), T> {
unsafe {
enqueue(
S::as_ptr(self.buffer.get()),
&self.enqueue_pos,
self.mask(),
item,
)
}
}
}
impl<T, const N: usize> Default for Queue<T, N> {
fn default() -> Self {
#[allow(deprecated)]
Self::new()
}
}
impl<T, S: Storage> Drop for QueueInner<T, S> {
fn drop(&mut self) {
while self.dequeue().is_some() {}
}
}
unsafe impl<T, S: Storage> Sync for QueueInner<T, S> where T: Send {}
struct Cell<T> {
data: MaybeUninit<T>,
sequence: AtomicTargetSize,
}
impl<T> Cell<T> {
const fn new(seq: usize) -> Self {
Self {
data: MaybeUninit::uninit(),
sequence: AtomicTargetSize::new(seq as UintSize),
}
}
}
unsafe fn dequeue<T>(
buffer: *mut Cell<T>,
dequeue_pos: &AtomicTargetSize,
mask: UintSize,
) -> Option<T> {
let mut pos = dequeue_pos.load(Ordering::Relaxed);
let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as IntSize).wrapping_sub((pos.wrapping_add(1)) as IntSize);
match dif.cmp(&0) {
core::cmp::Ordering::Equal => {
if dequeue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
core::cmp::Ordering::Less => {
return None;
}
core::cmp::Ordering::Greater => {
pos = dequeue_pos.load(Ordering::Relaxed);
}
}
}
let data = (*cell).data.as_ptr().read();
(*cell)
.sequence
.store(pos.wrapping_add(mask).wrapping_add(1), Ordering::Release);
Some(data)
}
unsafe fn enqueue<T>(
buffer: *mut Cell<T>,
enqueue_pos: &AtomicTargetSize,
mask: UintSize,
item: T,
) -> Result<(), T> {
let mut pos = enqueue_pos.load(Ordering::Relaxed);
let mut cell;
loop {
cell = buffer.add(usize::from(pos & mask));
let seq = (*cell).sequence.load(Ordering::Acquire);
let dif = (seq as IntSize).wrapping_sub(pos as IntSize);
match dif.cmp(&0) {
core::cmp::Ordering::Equal => {
if enqueue_pos
.compare_exchange_weak(
pos,
pos.wrapping_add(1),
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
core::cmp::Ordering::Less => {
return Err(item);
}
core::cmp::Ordering::Greater => {
pos = enqueue_pos.load(Ordering::Relaxed);
}
}
}
(*cell).data.as_mut_ptr().write(item);
(*cell)
.sequence
.store(pos.wrapping_add(1), Ordering::Release);
Ok(())
}
#[cfg(test)]
mod tests {
use static_assertions::assert_not_impl_any;
use super::Queue;
assert_not_impl_any!(Queue<*const (), 4>: Send);
#[test]
fn memory_leak() {
droppable!();
#[expect(deprecated)]
let q = Queue::<_, 2>::new();
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
q.enqueue(Droppable::new()).unwrap_or_else(|_| panic!());
drop(q);
assert_eq!(Droppable::count(), 0);
}
#[test]
fn sanity() {
#[expect(deprecated)]
let q = Queue::<_, 2>::new();
q.enqueue(0).unwrap();
q.enqueue(1).unwrap();
assert!(q.enqueue(2).is_err());
assert_eq!(q.dequeue(), Some(0));
assert_eq!(q.dequeue(), Some(1));
assert_eq!(q.dequeue(), None);
}
#[test]
fn drain_at_pos255() {
#[expect(deprecated)]
let q = Queue::<_, 2>::new();
for _ in 0..255 {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
assert_eq!(q.dequeue(), None);
}
#[test]
fn full_at_wrapped_pos0() {
#[expect(deprecated)]
let q = Queue::<_, 2>::new();
for _ in 0..254 {
assert!(q.enqueue(0).is_ok());
assert_eq!(q.dequeue(), Some(0));
}
assert!(q.enqueue(0).is_ok());
assert!(q.enqueue(0).is_ok());
assert!(q.enqueue(0).is_err());
}
#[test]
fn enqueue_full() {
#[cfg(not(feature = "mpmc_large"))]
const CAPACITY: usize = 128;
#[cfg(feature = "mpmc_large")]
const CAPACITY: usize = 256;
#[expect(deprecated)]
let q: Queue<u8, CAPACITY> = Queue::new();
assert_eq!(q.capacity(), CAPACITY);
for _ in 0..CAPACITY {
q.enqueue(0xAA).unwrap();
}
q.enqueue(0x55).unwrap_err();
}
}