use core::{
cell::UnsafeCell,
fmt, hash,
mem::MaybeUninit,
ptr,
sync::atomic::{AtomicUsize, Ordering},
};
pub struct Queue<T, const N: usize> {
pub(crate) head: AtomicUsize,
pub(crate) tail: AtomicUsize,
pub(crate) buffer: [UnsafeCell<MaybeUninit<T>>; N],
}
impl<T, const N: usize> Queue<T, N> {
const INIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());
#[inline]
fn increment(val: usize) -> usize {
(val + 1) % N
}
pub const fn new() -> Self {
Queue {
head: AtomicUsize::new(0),
tail: AtomicUsize::new(0),
buffer: [Self::INIT; N],
}
}
#[inline]
pub const fn capacity(&self) -> usize {
N - 1
}
#[inline]
pub fn len(&self) -> usize {
let current_head = self.head.load(Ordering::Relaxed);
let current_tail = self.tail.load(Ordering::Relaxed);
current_tail.wrapping_sub(current_head).wrapping_add(N) % N
}
#[inline]
pub fn is_empty(&self) -> bool {
self.head.load(Ordering::Relaxed) == self.tail.load(Ordering::Relaxed)
}
#[inline]
pub fn is_full(&self) -> bool {
Self::increment(self.tail.load(Ordering::Relaxed)) == self.head.load(Ordering::Relaxed)
}
pub fn iter(&self) -> Iter<'_, T, N> {
Iter {
rb: self,
index: 0,
len: self.len(),
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, T, N> {
let len = self.len();
IterMut {
rb: self,
index: 0,
len,
}
}
#[inline]
pub fn enqueue(&mut self, val: T) -> Result<(), T> {
unsafe { self.inner_enqueue(val) }
}
#[inline]
pub fn dequeue(&mut self) -> Option<T> {
unsafe { self.inner_dequeue() }
}
pub fn peek(&self) -> Option<&T> {
if !self.is_empty() {
let head = self.head.load(Ordering::Relaxed);
Some(unsafe { &*(self.buffer.get_unchecked(head).get() as *const T) })
} else {
None
}
}
unsafe fn inner_enqueue(&self, val: T) -> Result<(), T> {
let current_tail = self.tail.load(Ordering::Relaxed);
let next_tail = Self::increment(current_tail);
if next_tail != self.head.load(Ordering::Acquire) {
(self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
self.tail.store(next_tail, Ordering::Release);
Ok(())
} else {
Err(val)
}
}
unsafe fn inner_enqueue_unchecked(&self, val: T) {
let current_tail = self.tail.load(Ordering::Relaxed);
(self.buffer.get_unchecked(current_tail).get()).write(MaybeUninit::new(val));
self.tail
.store(Self::increment(current_tail), Ordering::Release);
}
pub unsafe fn enqueue_unchecked(&mut self, val: T) {
self.inner_enqueue_unchecked(val)
}
unsafe fn inner_dequeue(&self) -> Option<T> {
let current_head = self.head.load(Ordering::Relaxed);
if current_head == self.tail.load(Ordering::Acquire) {
None
} else {
let v = (self.buffer.get_unchecked(current_head).get() as *const T).read();
self.head
.store(Self::increment(current_head), Ordering::Release);
Some(v)
}
}
unsafe fn inner_dequeue_unchecked(&self) -> T {
let current_head = self.head.load(Ordering::Relaxed);
let v = (self.buffer.get_unchecked(current_head).get() as *const T).read();
self.head
.store(Self::increment(current_head), Ordering::Release);
v
}
pub unsafe fn dequeue_unchecked(&mut self) -> T {
self.inner_dequeue_unchecked()
}
pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) {
(Producer { rb: self }, Consumer { rb: self })
}
}
impl<T, const N: usize> Clone for Queue<T, N>
where
T: Clone,
{
fn clone(&self) -> Self {
let mut new: Queue<T, N> = Queue::new();
for s in self.iter() {
unsafe {
new.enqueue_unchecked(s.clone());
}
}
new
}
}
impl<T, const N: usize, const N2: usize> PartialEq<Queue<T, N2>> for Queue<T, N>
where
T: PartialEq,
{
fn eq(&self, other: &Queue<T, N2>) -> bool {
self.len() == other.len() && self.iter().zip(other.iter()).all(|(v1, v2)| v1 == v2)
}
}
impl<T, const N: usize> Eq for Queue<T, N> where T: Eq {}
pub struct Iter<'a, T, const N: usize> {
rb: &'a Queue<T, N>,
index: usize,
len: usize,
}
impl<'a, T, const N: usize> Clone for Iter<'a, T, N> {
fn clone(&self) -> Self {
Self {
rb: self.rb,
index: self.index,
len: self.len,
}
}
}
pub struct IterMut<'a, T, const N: usize> {
rb: &'a mut Queue<T, N>,
index: usize,
len: usize,
}
impl<'a, T, const N: usize> Iterator for Iter<'a, T, N> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.index) % N;
self.index += 1;
Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) })
} else {
None
}
}
}
impl<'a, T, const N: usize> Iterator for IterMut<'a, T, N> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.index) % N;
self.index += 1;
Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) })
} else {
None
}
}
}
impl<'a, T, const N: usize> DoubleEndedIterator for Iter<'a, T, N> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.len - 1) % N;
self.len -= 1;
Some(unsafe { &*(self.rb.buffer.get_unchecked(i).get() as *const T) })
} else {
None
}
}
}
impl<'a, T, const N: usize> DoubleEndedIterator for IterMut<'a, T, N> {
fn next_back(&mut self) -> Option<Self::Item> {
if self.index < self.len {
let head = self.rb.head.load(Ordering::Relaxed);
let i = (head + self.len - 1) % N;
self.len -= 1;
Some(unsafe { &mut *(self.rb.buffer.get_unchecked(i).get() as *mut T) })
} else {
None
}
}
}
impl<T, const N: usize> Drop for Queue<T, N> {
fn drop(&mut self) {
for item in self {
unsafe {
ptr::drop_in_place(item);
}
}
}
}
impl<T, const N: usize> fmt::Debug for Queue<T, N>
where
T: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_list().entries(self.iter()).finish()
}
}
impl<T, const N: usize> hash::Hash for Queue<T, N>
where
T: hash::Hash,
{
fn hash<H: hash::Hasher>(&self, state: &mut H) {
for t in self.iter() {
hash::Hash::hash(t, state);
}
}
}
impl<T, const N: usize> hash32::Hash for Queue<T, N>
where
T: hash32::Hash,
{
fn hash<H: hash32::Hasher>(&self, state: &mut H) {
for t in self.iter() {
hash32::Hash::hash(t, state);
}
}
}
impl<'a, T, const N: usize> IntoIterator for &'a Queue<T, N> {
type Item = &'a T;
type IntoIter = Iter<'a, T, N>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a, T, const N: usize> IntoIterator for &'a mut Queue<T, N> {
type Item = &'a mut T;
type IntoIter = IterMut<'a, T, N>;
fn into_iter(self) -> Self::IntoIter {
self.iter_mut()
}
}
pub struct Consumer<'a, T, const N: usize> {
rb: &'a Queue<T, N>,
}
unsafe impl<'a, T, const N: usize> Send for Consumer<'a, T, N> where T: Send {}
pub struct Producer<'a, T, const N: usize> {
rb: &'a Queue<T, N>,
}
unsafe impl<'a, T, const N: usize> Send for Producer<'a, T, N> where T: Send {}
impl<'a, T, const N: usize> Consumer<'a, T, N> {
#[inline]
pub fn dequeue(&mut self) -> Option<T> {
unsafe { self.rb.inner_dequeue() }
}
#[inline]
pub unsafe fn dequeue_unchecked(&mut self) -> T {
self.rb.inner_dequeue_unchecked()
}
#[inline]
pub fn ready(&self) -> bool {
!self.rb.is_empty()
}
#[inline]
pub fn len(&self) -> usize {
self.rb.len()
}
#[inline]
pub fn capacity(&self) -> usize {
self.rb.capacity()
}
#[inline]
pub fn peek(&self) -> Option<&T> {
self.rb.peek()
}
}
impl<'a, T, const N: usize> Producer<'a, T, N> {
#[inline]
pub fn enqueue(&mut self, val: T) -> Result<(), T> {
unsafe { self.rb.inner_enqueue(val) }
}
#[inline]
pub unsafe fn enqueue_unchecked(&mut self, val: T) {
self.rb.inner_enqueue_unchecked(val)
}
#[inline]
pub fn ready(&self) -> bool {
!self.rb.is_full()
}
#[inline]
pub fn len(&self) -> usize {
self.rb.len()
}
#[inline]
pub fn capacity(&self) -> usize {
self.rb.capacity()
}
}
#[cfg(test)]
mod tests {
use crate::spsc::Queue;
use hash32::Hasher;
#[test]
fn full() {
let mut rb: Queue<i32, 3> = Queue::new();
assert_eq!(rb.is_full(), false);
rb.enqueue(1).unwrap();
assert_eq!(rb.is_full(), false);
rb.enqueue(2).unwrap();
assert_eq!(rb.is_full(), true);
}
#[test]
fn empty() {
let mut rb: Queue<i32, 3> = Queue::new();
assert_eq!(rb.is_empty(), true);
rb.enqueue(1).unwrap();
assert_eq!(rb.is_empty(), false);
rb.enqueue(2).unwrap();
assert_eq!(rb.is_empty(), false);
}
#[test]
fn len() {
let mut rb: Queue<i32, 3> = Queue::new();
assert_eq!(rb.len(), 0);
rb.enqueue(1).unwrap();
assert_eq!(rb.len(), 1);
rb.enqueue(2).unwrap();
assert_eq!(rb.len(), 2);
for _ in 0..1_000_000 {
let v = rb.dequeue().unwrap();
println!("{}", v);
rb.enqueue(v).unwrap();
assert_eq!(rb.len(), 2);
}
}
#[test]
fn try_overflow() {
const N: usize = 23;
let mut rb: Queue<i32, N> = Queue::new();
for i in 0..N as i32 - 1 {
rb.enqueue(i).unwrap();
}
for _ in 0..1_000_000 {
for i in 0..N as i32 - 1 {
let d = rb.dequeue().unwrap();
assert_eq!(d, i);
rb.enqueue(i).unwrap();
}
}
}
#[test]
fn sanity() {
let mut rb: Queue<i32, 10> = Queue::new();
let (mut p, mut c) = rb.split();
assert_eq!(p.ready(), true);
assert_eq!(c.ready(), false);
assert_eq!(c.dequeue(), None);
p.enqueue(0).unwrap();
assert_eq!(c.dequeue(), Some(0));
}
#[test]
fn static_new() {
static mut _Q: Queue<i32, 4> = Queue::new();
}
#[test]
fn drop() {
struct Droppable;
impl Droppable {
fn new() -> Self {
unsafe {
COUNT += 1;
}
Droppable
}
}
impl Drop for Droppable {
fn drop(&mut self) {
unsafe {
COUNT -= 1;
}
}
}
static mut COUNT: i32 = 0;
{
let mut v: Queue<Droppable, 4> = Queue::new();
v.enqueue(Droppable::new()).ok().unwrap();
v.enqueue(Droppable::new()).ok().unwrap();
v.dequeue().unwrap();
}
assert_eq!(unsafe { COUNT }, 0);
{
let mut v: Queue<Droppable, 4> = Queue::new();
v.enqueue(Droppable::new()).ok().unwrap();
v.enqueue(Droppable::new()).ok().unwrap();
}
assert_eq!(unsafe { COUNT }, 0);
}
#[test]
fn iter() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.dequeue().unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.enqueue(3).unwrap();
let mut items = rb.iter();
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), Some(&2));
assert_eq!(items.next(), Some(&3));
assert_eq!(items.next(), None);
}
#[test]
fn iter_double_ended() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter();
assert_eq!(items.next(), Some(&0));
assert_eq!(items.next_back(), Some(&2));
assert_eq!(items.next(), Some(&1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}
#[test]
fn iter_mut() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), Some(&mut 2));
assert_eq!(items.next(), None);
}
#[test]
fn iter_mut_double_ended() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
let mut items = rb.iter_mut();
assert_eq!(items.next(), Some(&mut 0));
assert_eq!(items.next_back(), Some(&mut 2));
assert_eq!(items.next(), Some(&mut 1));
assert_eq!(items.next(), None);
assert_eq!(items.next_back(), None);
}
#[test]
fn wrap_around() {
let mut rb: Queue<i32, 4> = Queue::new();
rb.enqueue(0).unwrap();
rb.enqueue(1).unwrap();
rb.enqueue(2).unwrap();
rb.dequeue().unwrap();
rb.dequeue().unwrap();
rb.dequeue().unwrap();
rb.enqueue(3).unwrap();
rb.enqueue(4).unwrap();
assert_eq!(rb.len(), 2);
}
#[test]
fn ready_flag() {
let mut rb: Queue<i32, 3> = Queue::new();
let (mut p, mut c) = rb.split();
assert_eq!(c.ready(), false);
assert_eq!(p.ready(), true);
p.enqueue(0).unwrap();
assert_eq!(c.ready(), true);
assert_eq!(p.ready(), true);
p.enqueue(1).unwrap();
assert_eq!(c.ready(), true);
assert_eq!(p.ready(), false);
c.dequeue().unwrap();
assert_eq!(c.ready(), true);
assert_eq!(p.ready(), true);
c.dequeue().unwrap();
assert_eq!(c.ready(), false);
assert_eq!(p.ready(), true);
}
#[test]
fn clone() {
let mut rb1: Queue<i32, 4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
let rb2 = rb1.clone();
assert_eq!(rb1.capacity(), rb2.capacity());
assert_eq!(rb1.len(), rb2.len());
assert!(rb1.iter().zip(rb2.iter()).all(|(v1, v2)| v1 == v2));
}
#[test]
fn eq() {
let mut rb1: Queue<i32, 4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
let mut rb2: Queue<i32, 4> = Queue::new();
rb2.enqueue(0).unwrap();
rb2.enqueue(0).unwrap();
assert!(rb1 == rb2);
assert!(rb2 == rb1);
rb1.enqueue(0).unwrap();
assert!(rb1 != rb2);
rb2.enqueue(1).unwrap();
assert!(rb1 != rb2);
assert!(rb1 == rb1);
assert!(rb2 == rb2);
}
#[test]
fn hash_equality() {
let rb1 = {
let mut rb1: Queue<i32, 4> = Queue::new();
rb1.enqueue(0).unwrap();
rb1.enqueue(0).unwrap();
rb1.dequeue().unwrap();
rb1.enqueue(0).unwrap();
rb1
};
let rb2 = {
let mut rb2: Queue<i32, 4> = Queue::new();
rb2.enqueue(0).unwrap();
rb2.enqueue(0).unwrap();
rb2
};
let hash1 = {
let mut hasher1 = hash32::FnvHasher::default();
hash32::Hash::hash(&rb1, &mut hasher1);
let hash1 = hasher1.finish();
hash1
};
let hash2 = {
let mut hasher2 = hash32::FnvHasher::default();
hash32::Hash::hash(&rb2, &mut hasher2);
let hash2 = hasher2.finish();
hash2
};
assert_eq!(hash1, hash2);
}
}