use std::cell::Cell;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use crossbeam_utils::CachePadded;
use crate::err::{PopError, PushError};
struct Inner<T> {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
buffer: *mut T,
cap: usize,
_marker: PhantomData<T>,
}
impl<T> Inner<T> {
#[inline]
unsafe fn slot(&self, pos: usize) -> *mut T {
if pos < self.cap {
self.buffer.add(pos)
} else {
self.buffer.add(pos - self.cap)
}
}
#[inline]
fn increment(&self, pos: usize) -> usize {
if pos < 2 * self.cap - 1 {
pos + 1
} else {
0
}
}
#[inline]
fn distance(&self, a: usize, b: usize) -> usize {
if a <= b {
b - a
} else {
2 * self.cap - a + b
}
}
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
let mut head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Relaxed);
while head != tail {
unsafe {
self.slot(head).drop_in_place();
}
head = self.increment(head);
}
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
pub fn new<T>(cap: usize) -> (Producer<T>, Consumer<T>) {
assert!(cap > 0, "capacity must be non-zero");
let buffer = {
let mut v = Vec::<T>::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
ptr
};
let inner = Arc::new(Inner {
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
buffer,
cap,
_marker: PhantomData,
});
let p = Producer {
inner: inner.clone(),
head: Cell::new(0),
tail: Cell::new(0),
};
let c = Consumer {
inner,
head: Cell::new(0),
tail: Cell::new(0),
};
(p, c)
}
pub struct Producer<T> {
inner: Arc<Inner<T>>,
head: Cell<usize>,
tail: Cell<usize>,
}
unsafe impl<T: Send> Send for Producer<T> {}
impl<T> Producer<T> {
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
let mut head = self.head.get();
let mut tail = self.tail.get();
if self.inner.distance(head, tail) == self.inner.cap {
head = self.inner.head.load(Ordering::Acquire);
self.head.set(head);
if self.inner.distance(head, tail) == self.inner.cap {
return Err(PushError(value));
}
}
unsafe {
self.inner.slot(tail).write(value);
}
tail = self.inner.increment(tail);
self.inner.tail.store(tail, Ordering::Release);
self.tail.set(tail);
Ok(())
}
pub fn capacity(&self) -> usize {
self.inner.cap
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_full(&self) -> bool {
self.len() == self.inner.cap
}
pub fn len(&self) -> usize {
let head = self.inner.head.load(Ordering::Acquire);
let tail = self.tail.get();
self.head.set(head);
self.inner.distance(head, tail)
}
}
impl<T> fmt::Debug for Producer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Producer { .. }")
}
}
pub struct Consumer<T> {
inner: Arc<Inner<T>>,
head: Cell<usize>,
tail: Cell<usize>,
}
unsafe impl<T: Send> Send for Consumer<T> {}
impl<T> Consumer<T> {
pub fn pop(&self) -> Result<T, PopError> {
let mut head = self.head.get();
let mut tail = self.tail.get();
if head == tail {
tail = self.inner.tail.load(Ordering::Acquire);
self.tail.set(tail);
if head == tail {
return Err(PopError);
}
}
let value = unsafe { self.inner.slot(head).read() };
head = self.inner.increment(head);
self.inner.head.store(head, Ordering::Release);
self.head.set(head);
Ok(value)
}
pub fn capacity(&self) -> usize {
self.inner.cap
}
pub fn is_empty(&self) -> bool {
!(self.head != self.tail || self.len() != 0)
}
pub fn is_full(&self) -> bool {
self.len() == self.inner.cap
}
pub fn wait(&self) {
let head = self.head.get();
let mut tail = self.inner.tail.load(Ordering::Acquire);
if head != tail {
self.tail.set(tail);
return;
}
loop {
tail = self.inner.tail.load(Ordering::Acquire);
if head != tail {
break;
}
}
self.tail.set(tail);
}
pub fn len(&self) -> usize {
let head = self.head.get();
let tail = self.inner.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.inner.distance(head, tail)
}
}
impl<T> fmt::Debug for Consumer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Consumer { .. }")
}
}