use core::{borrow::Borrow, cell::UnsafeCell, fmt, hash, mem::MaybeUninit, ptr};
#[cfg(not(feature = "portable-atomic"))]
use core::sync::atomic;
#[cfg(feature = "portable-atomic")]
use portable_atomic as atomic;
use atomic::{AtomicUsize, Ordering};
use crate::storage::{OwnedStorage, Storage, ViewStorage};
pub struct QueueInner<T, S: Storage> {
pub(crate) head: AtomicUsize,
pub(crate) tail: AtomicUsize,
pub(crate) buffer: S::Buffer<UnsafeCell<MaybeUninit<T>>>,
}
pub type Queue<T, const N: usize> = QueueInner<T, OwnedStorage<N>>;
pub type QueueView<T> = QueueInner<T, ViewStorage>;
impl<T, const N: usize> Queue<T, N> {
pub const fn new() -> Self {
const {
assert!(N > 1);
}
Queue {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
buffer: [const { UnsafeCell::new(MaybeUninit::uninit()) }; N],
}
}
pub(crate) fn as_view_private(&self) -> &QueueView<T> {
self
}
pub(crate) fn as_mut_view_private(&mut self) -> &mut QueueView<T> {
self
}
}
impl<T, S: Storage> QueueInner<T, S> {
pub fn as_view(&self) -> &QueueView<T> {
S::as_queue_view(self)
}
pub fn as_mut_view(&mut self) -> &mut QueueView<T> {
S::as_mut_queue_view(self)
}
#[inline]
fn increment(&self, val: usize) -> usize {
(val + 1) % self.n()
}
#[inline]
fn n(&self) -> usize {
self.buffer.borrow().len()
}
#[inline]
pub fn capacity(&self) -> usize {
self.n() - 1
}
#[inline]
pub fn len(&self) -> usize {
let current_head = self.head.load(Ordering::Relaxed);
let current_tail = self.tail.load(Ordering::Relaxed);
current_tail
.wrapping_sub(current_head)
.wrapping_add(self.n())
% self.n()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
}
#[inline]
pub fn is_full(&self) -> bool {
self.increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
}
pub fn iter(&self) -> Iter<'_, T> {
Iter {
rb: self.as_view(),
index: 0,
len: self.len(),
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, T> {
let len = self.len();
IterMut {
rb: self.as_view(),
index: 0,
len,
}
}
#[inline]
pub fn enqueue(&mut self, item: T) -> Result<(), T> {
unsafe { self.inner_enqueue(item) }
}
#[inline]
pub fn dequeue(&mut self) -> Option<T> {
unsafe { self.inner_dequeue() }
}
pub fn peek(&self) -> Option<&T> {
if self.is_empty() {
None
} else {
let head = self.head.load(Ordering::Relaxed);
Some(unsafe { &*(self.buffer.borrow().get_unchecked(head).get() as *const T) })
}
}
unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
let current_tail = self.tail.load(Ordering::Relaxed);
let next_tail = self.increment(current_tail);
if next_tail == self.head.load(Ordering::Acquire) {
Err(val)
} else {
(self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
self.tail.store(next_tail, Ordering::Release);
Ok(())
}
}
unsafe fn inner_enqueue_unchecked(&self, val: T) {
let current_tail = self.tail.load(Ordering::Relaxed);
(self.buffer.borrow().get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
self.tail
.store(self.increment(current_tail), Ordering::Release);
}
pub unsafe fn enqueue_unchecked(&mut self, item: T) {
self.inner_enqueue_unchecked(item);
}
unsafe fn inner_dequeue(&self) -> Option<T> {
let current_head = self.head.load(Ordering::Relaxed);
if current_head == self.tail.load(Ordering::Acquire) {
None
} else {
let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
self.head
.store(self.increment(current_head), Ordering::Release);
Some(v)
}
}
unsafe fn inner_dequeue_unchecked(&self) -> T {
let current_head = self.head.load(Ordering::Relaxed);
let v = (self.buffer.borrow().get_unchecked(current_head).get() as *const T).read();
self.head
.store(self.increment(current_head), Ordering::Release);
v
}
pub unsafe fn dequeue_unchecked(&mut self) -> T {
self.inner_dequeue_unchecked()
}
pub fn split(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
(
Producer { rb: self.as_view() },
Consumer { rb: self.as_view() },
)
}
}
impl<T, const N: usize> Queue<T, N> {
pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
(Producer { rb: self }, Consumer { rb: self })
}
}
impl<T> QueueView<T> {
pub const fn split_const(&mut self) -> (Producer<'_, T>, Consumer<'_, T>) {
(Producer { rb: self }, Consumer { rb: self })
}
}
impl<T, const N: usize> Default for Queue<T, N> {
fn default() -> Self {
Self::new()
}
}
impl<T, const N: usize> Clone for Queue<T, N>
where
T: Clone,
{
fn clone(&self) -> Self {
let mut new: Self = Self::new();
for s in self.iter() {
unsafe {
new.enqueue_unchecked(s.clone());
}
}
new
}
}
impl<T, S, S2> PartialEq<QueueInner<T, S2>> for QueueInner<T, S>
where
T: PartialEq,
S: Storage,
S2: Storage,
{
fn eq(&self, other: &QueueInner<T, S2>) -> bool {
self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
}
}
impl<T, S: Storage> Eq for QueueInner<T, S> where T: Eq {}
pub struct Iter<'a, T> {
rb: &'a QueueView<T>,
index: usize,
len: usize,
}
impl<T> Clone for Iter<'_, T> {
fn clone(&self) -> Self {
Self {
rb: self.rb,
index: self.index,
len: self.len,
}
}
}
pub struct IterMut<'a, T> {
rb: &'a QueueView<T>,
index: usize,
len: usize,
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.index) % self.rb.n();
self.index += 1;
Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
} else {
None
}
}
}
impl<'a, T> Iterator for IterMut<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.index) % self.rb.n();
self.index += 1;
Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
} else {
None
}
}
}
impl<T> DoubleEndedIterator for Iter<'_, T> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.len - 1) % self.rb.n();
self.len -= 1;
Some(unsafe { &*(self.rb.buffer.borrow().get_unchecked(i).get() as *const T) })
} else {
None
}
}
}
impl<T> DoubleEndedIterator for IterMut<'_, T> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.len - 1) % self.rb.n();
self.len -= 1;
Some(unsafe { &mut *self.rb.buffer.borrow().get_unchecked(i).get().cast::<T>() })
} else {
None
}
}
}
impl<T, S: Storage> Drop for QueueInner<T, S> {
fn drop(&mut self) {
for item in self {
unsafe {
ptr::drop_in_place(item);
}
}
}
}
impl<T, S> fmt::Debug for QueueInner<T, S>
where
T: fmt::Debug,
S: Storage,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
impl<T, S> hash::Hash for QueueInner<T, S>
where
T: hash::Hash,
S: Storage,
{
fn hash<H: hash::Hasher>(&self, state: &mut H) {
for t in self.iter() {
hash::Hash::hash(t, state);
}
}
}
impl<'a, T, S: Storage> IntoIterator for &'a QueueInner<T, S> {
type Item = &'a T;
type IntoIter = Iter<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a, T, S: Storage> IntoIterator for &'a mut QueueInner<T, S> {
type Item = &'a mut T;
type IntoIter = IterMut<'a, T>;
fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
pub struct Consumer<'a, T> {
rb: &'a QueueView<T>,
}
unsafe impl<T> Send for Consumer<'_, T> where T: Send {}
pub struct Producer<'a, T> {
rb: &'a QueueView<T>,
}
unsafe impl<T> Send for Producer<'_, T> where T: Send {}
impl<T> Consumer<'_, T> {
#[inline]
pub fn dequeue(&mut self) -> Option<T> {
unsafe { self.rb.inner_dequeue() }
}
#[inline]
pub unsafe fn dequeue_unchecked(&mut self) -> T {
self.rb.inner_dequeue_unchecked()
}
#[inline]
pub fn ready(&self) -> bool {
!self.rb.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.rb.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn capacity(&self) -> usize {
self.rb.capacity()
}
#[inline]
pub fn peek(&self) -> Option<&T> {
self.rb.peek()
}
}
impl<T> Producer<'_, T> {
#[inline]
pub fn enqueue(&mut self, item: T) -> Result<(), T> {
unsafe { self.rb.inner_enqueue(item) }
}
#[inline]
pub unsafe fn enqueue_unchecked(&mut self, item: T) {
self.rb.inner_enqueue_unchecked(item);
}
#[inline]
pub fn ready(&self) -> bool {
!self.rb.is_full()
}
#[inline]
pub fn len(&self) -> usize {
self.rb.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[inline]
pub fn capacity(&self) -> usize {
self.rb.capacity()
}
}
#[cfg(test)]
mod tests {
use std::hash::{Hash, Hasher};
use super::{Consumer, Producer, Queue};
use static_assertions::assert_not_impl_any;
assert_not_impl_any!(Queue<*const (), 4>: Send);
assert_not_impl_any!(Producer<*const ()>: Send);
assert_not_impl_any!(Consumer<*const ()>: Send);
#[test]
fn const_split() {
use critical_section::Mutex;
use std::cell::RefCell;
use super::{Consumer, Producer};
#[allow(clippy::type_complexity)]
static PC: (
Mutex<RefCell<Option<Producer<'_, ()>>>>,
Mutex<RefCell<Option<Consumer<'_, ()>>>>,
) = {
static mut Q: Queue<(), 4> = Queue::new();
#[allow(static_mut_refs)]
let (p, c) = unsafe { Q.split_const() };
(
Mutex::new(RefCell::new(Some(p))),
Mutex::new(RefCell::new(Some(c))),
)
};
let producer = critical_section::with(|cs| PC.0.borrow_ref_mut(cs).take().unwrap());
let consumer = critical_section::with(|cs| PC.1.borrow_ref_mut(cs).take().unwrap());
let mut producer: Producer<'static, ()> = producer;
let mut consumer: Consumer<'static, ()> = consumer;
assert_eq!(producer.enqueue(()), Ok(()));
assert_eq!(consumer.dequeue(), Some(()));
}
#[test]
fn full() {
let mut rb: Queue<i32, 3> = Queue::new();
assert!(!rb.is_full());
rb.enqueue(1).unwrap();
assert!(!rb.is_full());
rb.enqueue(2).unwrap();
assert!(rb.is_full());
}
#[test]
fn empty() {
let mut rb: Queue<i32, 3> = Queue::new();
assert!(rb.is_empty());
rb.enqueue(1).unwrap();
assert!(!rb.is_empty());
rb.enqueue(2).unwrap();
assert!(!rb.is_empty());
}
#[test]
#[cfg_attr(miri, ignore)] fn len() {
let mut rb: Queue<i32, 3> = Queue::new();
assert_eq!(rb.len(), 0);
rb.enqueue(1).unwrap();
assert_eq!(rb.len(), 1);
rb.enqueue(2).unwrap();
assert_eq!(rb.len(), 2);
for _ in 0..1_000_000 {
let v = rb.dequeue().unwrap();
println!("{v}");
rb.enqueue(v).unwrap();
assert_eq!(rb.len(), 2);
}
}
#[test]
#[cfg_attr(miri, ignore)] fn try_overflow() {
const N: usize = 23;
let mut rb: Queue<i32, N> = Queue::new();
for i in 0..N as i32 - 1 {
rb.enqueue(i).unwrap();
}
for _ in 0..1_000_000 {
for i in 0..N as i32 - 1 {
let d = rb.dequeue().unwrap();
assert_eq!(d, i);
rb.enqueue(i).unwrap();
}
}
}
#[test]
fn sanity() {
let mut rb: Queue<i32, 10> = Queue::new();
let (mut p, mut c) = rb.split();
assert!(p.ready());
assert!(!c.ready());
assert_eq!(c.dequeue(), None);
p.enqueue(0).unwrap();
assert_eq!(c.dequeue(), Some(0));
}
#[test]
fn static_new() {
static mut _Q: Queue<i32, 4> = Queue::new();
}
#[test]
fn drop() {
struct Droppable;
impl Droppable {
fn new() -> Self {
unsafe {
COUNT += 1;
}
Self
}
}
impl Drop for Droppable {
fn drop(&mut self) {
unsafe {
COUNT -= 1;
}
}
}
static mut COUNT: i32 = 0;
{
let mut v: Queue<Droppable, 4> = Queue::new();
v.enqueue(Droppable::new()).ok().unwrap();
v.enqueue(Droppable::new()).ok().unwrap();
v.dequeue().unwrap();
}
assert_eq!(unsafe { COUNT }, 0);
{
let mut v: Queue<Droppable, 4> = Queue::new();
v.enqueue(Droppable::new()).ok().unwrap();
v.enqueue(Droppable::new()).ok().unwrap();
}
assert_eq!(unsafe { COUNT }, 0);
}
#[test]
fn iter() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.dequeue().unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.enqueue(3).unwrap();
let mut items = rb.iter();
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), Some(&2));
assert_eq!(items.next(), Some(&3));
assert_eq!(items.next(), None);
}
#[test]
fn iter_double_ended() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter();
assert_eq!(items.next(), Some(&0));
assert_eq!(items.next_back(), Some(&2));
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}
#[test]
fn iter_mut() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), Some(&mut 2));
assert_eq!(items.next(), None);
}
#[test]
fn iter_mut_double_ended() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next_back(), Some(&mut 2));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}
#[test]
fn wrap_around() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.dequeue().unwrap();
rb.dequeue().unwrap();
rb.dequeue().unwrap();
rb.enqueue(3).unwrap();
rb.enqueue(4).unwrap();
assert_eq!(rb.len(), 2);
}
#[test]
fn ready_flag() {
let mut rb: Queue<i32, 3> = Queue::new();
let (mut p, mut c) = rb.split();
assert!(!c.ready());
assert!(p.ready());
p.enqueue(0).unwrap();
assert!(c.ready());
assert!(p.ready());
p.enqueue(1).unwrap();
assert!(c.ready());
assert!(!p.ready());
c.dequeue().unwrap();
assert!(c.ready());
assert!(p.ready());
c.dequeue().unwrap();
assert!(!c.ready());
assert!(p.ready());
}
#[test]
fn clone() {
let mut rb1: Queue<i32, 4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
let rb2 = rb1.clone();
assert_eq!(rb1.capacity(), rb2.capacity());
assert_eq!(rb1.len(), rb2.len());
assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
}
#[test]
fn eq() {
let mut rb1: Queue<i32, 4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
let mut rb2: Queue<i32, 4> = Queue::new();
rb2.enqueue(0).unwrap();
rb2.enqueue(0).unwrap();
assert!(rb1 == rb2);
assert!(rb2 == rb1);
rb1.enqueue(0).unwrap();
assert!(rb1 != rb2);
rb2.enqueue(1).unwrap();
assert!(rb1 != rb2);
assert!(rb1 == rb1);
assert!(rb2 == rb2);
}
#[test]
fn hash_equality() {
let rb1 = {
let mut rb1: Queue<i32, 4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
rb1
};
let rb2 = {
let mut rb2: Queue<i32, 4> = Queue::new();
rb2.enqueue(0).unwrap();
rb2.enqueue(0).unwrap();
rb2
};
let hash1 = {
let mut hasher1 = hash32::FnvHasher::default();
rb1.hash(&mut hasher1);
hasher1.finish()
};
let hash2 = {
let mut hasher2 = hash32::FnvHasher::default();
rb2.hash(&mut hasher2);
hasher2.finish()
};
assert_eq!(hash1, hash2);
}
}