use alloc::vec::Vec;
use core::cell::UnsafeCell;
use core::fmt;
use core::marker::PhantomData;
use core::mem;
use core::ptr;
use core::sync::atomic::{self, AtomicUsize, Ordering};
use crossbeam_utils::{Backoff, CachePadded};
use err::{PopError, PushError};
struct Slot<T> {
stamp: AtomicUsize,
value: UnsafeCell<T>,
}
pub struct ArrayQueue<T> {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
buffer: *mut Slot<T>,
cap: usize,
one_lap: usize,
_marker: PhantomData<T>,
}
unsafe impl<T: Send> Sync for ArrayQueue<T> {}
unsafe impl<T: Send> Send for ArrayQueue<T> {}
impl<T> ArrayQueue<T> {
pub fn new(cap: usize) -> ArrayQueue<T> {
assert!(cap > 0, "capacity must be non-zero");
let head = 0;
let tail = 0;
let buffer = {
let mut v = Vec::<Slot<T>>::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
ptr
};
for i in 0..cap {
unsafe {
let slot = buffer.add(i);
ptr::write(&mut (*slot).stamp, AtomicUsize::new(i));
}
}
let one_lap = (cap + 1).next_power_of_two();
ArrayQueue {
buffer,
cap,
one_lap,
head: CachePadded::new(AtomicUsize::new(head)),
tail: CachePadded::new(AtomicUsize::new(tail)),
_marker: PhantomData,
}
}
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
let backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
loop {
let index = tail & (self.one_lap - 1);
let lap = tail & !(self.one_lap - 1);
let slot = unsafe { &*self.buffer.add(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
if tail == stamp {
let new_tail = if index + 1 < self.cap {
tail + 1
} else {
lap.wrapping_add(self.one_lap)
};
match self.tail.compare_exchange_weak(
tail,
new_tail,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
unsafe {
slot.value.get().write(value);
}
slot.stamp.store(tail + 1, Ordering::Release);
return Ok(());
}
Err(t) => {
tail = t;
backoff.spin();
}
}
} else if stamp.wrapping_add(self.one_lap) == tail + 1 {
atomic::fence(Ordering::SeqCst);
let head = self.head.load(Ordering::Relaxed);
if head.wrapping_add(self.one_lap) == tail {
return Err(PushError(value));
}
backoff.spin();
tail = self.tail.load(Ordering::Relaxed);
} else {
backoff.snooze();
tail = self.tail.load(Ordering::Relaxed);
}
}
}
pub fn pop(&self) -> Result<T, PopError> {
let backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
loop {
let index = head & (self.one_lap - 1);
let lap = head & !(self.one_lap - 1);
let slot = unsafe { &*self.buffer.add(index) };
let stamp = slot.stamp.load(Ordering::Acquire);
if head + 1 == stamp {
let new = if index + 1 < self.cap {
head + 1
} else {
lap.wrapping_add(self.one_lap)
};
match self.head.compare_exchange_weak(
head,
new,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => {
let msg = unsafe { slot.value.get().read() };
slot.stamp
.store(head.wrapping_add(self.one_lap), Ordering::Release);
return Ok(msg);
}
Err(h) => {
head = h;
backoff.spin();
}
}
} else if stamp == head {
atomic::fence(Ordering::SeqCst);
let tail = self.tail.load(Ordering::Relaxed);
if tail == head {
return Err(PopError);
}
backoff.spin();
head = self.head.load(Ordering::Relaxed);
} else {
backoff.snooze();
head = self.head.load(Ordering::Relaxed);
}
}
}
pub fn capacity(&self) -> usize {
self.cap
}
pub fn is_empty(&self) -> bool {
let head = self.head.load(Ordering::SeqCst);
let tail = self.tail.load(Ordering::SeqCst);
tail == head
}
pub fn is_full(&self) -> bool {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
head.wrapping_add(self.one_lap) == tail
}
pub fn len(&self) -> usize {
loop {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);
if self.tail.load(Ordering::SeqCst) == tail {
let hix = head & (self.one_lap - 1);
let tix = tail & (self.one_lap - 1);
return if hix < tix {
tix - hix
} else if hix > tix {
self.cap - hix + tix
} else if tail == head {
0
} else {
self.cap
};
}
}
}
}
impl<T> Drop for ArrayQueue<T> {
fn drop(&mut self) {
let hix = self.head.load(Ordering::Relaxed) & (self.one_lap - 1);
for i in 0..self.len() {
let index = if hix + i < self.cap {
hix + i
} else {
hix + i - self.cap
};
unsafe {
self.buffer.add(index).drop_in_place();
}
}
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
impl<T> fmt::Debug for ArrayQueue<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("ArrayQueue { .. }")
}
}