//! A bounded single-producer single-consumer queue.
//!
//! Lifted from an [unpublished branch of crossbeam](https://github.com/stjepang/crossbeam/tree/spsc/crossbeam-queue/src)
//!
//! # Examples
//!
//! ```
//! use rustasim::spsc;
//!
//! let (p, c) = spsc::new(2);
//!
//! assert!(p.push(1).is_ok());
//! assert!(p.push(2).is_ok());
//! assert!(p.push(3).is_err());
//!
//! assert_eq!(c.pop(), Ok(1));
//! assert_eq!(c.pop(), Ok(2));
//! assert!(c.pop().is_err());
//! ```
use std::cell::Cell;
use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
//use std::thread;
//use crossbeam_utils::Backoff;
use crossbeam_utils::CachePadded;
use crate::err::{PopError, PushError};
/// The inner representation of a single-producer single-consumer queue.
struct Inner<T> {
/// The head of the queue.
///
/// This integer is in range `0 .. 2 * cap`.
head: CachePadded<AtomicUsize>,
/// The tail of the queue.
///
/// This integer is in range `0 .. 2 * cap`.
tail: CachePadded<AtomicUsize>,
/// The buffer holding slots.
buffer: *mut T,
/// The queue capacity.
cap: usize,
/// Indicates that dropping a `Buffer<T>` may drop elements of type `T`.
_marker: PhantomData<T>,
}
impl<T> Inner<T> {
/// Returns a pointer to the slot at position `pos`.
///
/// The position must be in range `0 .. 2 * cap`.
#[inline]
unsafe fn slot(&self, pos: usize) -> *mut T {
if pos < self.cap {
self.buffer.add(pos)
} else {
self.buffer.add(pos - self.cap)
}
}
/// Increments a position by going one slot forward.
///
/// The position must be in range `0 .. 2 * cap`.
#[inline]
fn increment(&self, pos: usize) -> usize {
if pos < 2 * self.cap - 1 {
pos + 1
} else {
0
}
}
/// Returns the distance between two positions.
///
/// Positions must be in range `0 .. 2 * cap`.
#[inline]
fn distance(&self, a: usize, b: usize) -> usize {
if a <= b {
b - a
} else {
2 * self.cap - a + b
}
}
}
impl<T> Drop for Inner<T> {
fn drop(&mut self) {
let mut head = self.head.load(Ordering::Relaxed);
let tail = self.tail.load(Ordering::Relaxed);
// Loop over all slots that hold a value and drop them.
while head != tail {
unsafe {
self.slot(head).drop_in_place();
}
head = self.increment(head);
}
// Finally, deallocate the buffer, but don't run any destructors.
unsafe {
Vec::from_raw_parts(self.buffer, 0, self.cap);
}
}
}
/// Creates a bounded single-producer single-consumer queue with the given capacity.
///
/// Returns the producer and the consumer side for the queue.
///
/// # Panics
///
/// Panics if the capacity is zero.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new::<i32>(100);
/// ```
pub fn new<T>(cap: usize) -> (Producer<T>, Consumer<T>) {
assert!(cap > 0, "capacity must be non-zero");
// Allocate a buffer of length `cap`.
let buffer = {
let mut v = Vec::<T>::with_capacity(cap);
let ptr = v.as_mut_ptr();
mem::forget(v);
ptr
};
let inner = Arc::new(Inner {
head: CachePadded::new(AtomicUsize::new(0)),
tail: CachePadded::new(AtomicUsize::new(0)),
buffer,
cap,
_marker: PhantomData,
});
let p = Producer {
inner: inner.clone(),
head: Cell::new(0),
tail: Cell::new(0),
};
let c = Consumer {
inner,
head: Cell::new(0),
tail: Cell::new(0),
};
(p, c)
}
/// The producer side of a bounded single-producer single-consumer queue.
///
/// # Examples
///
/// ```
/// use rustasim::{spsc, PushError};
///
/// let (p, c) = spsc::new::<i32>(1);
///
/// assert_eq!(p.push(10), Ok(()));
/// assert_eq!(p.push(20), Err(PushError(20)));
///
/// assert!(!p.is_empty());
/// assert!(p.is_full());
/// ```
pub struct Producer<T> {
/// The inner representation of the queue.
inner: Arc<Inner<T>>,
/// A copy of `inner.head` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `inner.head`.
head: Cell<usize>,
/// A copy of `inner.tail` for quick access.
///
/// This value is always in sync with `inner.tail`.
tail: Cell<usize>,
}
unsafe impl<T: Send> Send for Producer<T> {}
impl<T> Producer<T> {
/// Attempts to push an element into the queue.
///
/// If the queue is full, the element is returned back as an error.
///
/// # Examples
///
/// ```
/// use rustasim::{spsc, PushError};
///
/// let (p, c) = spsc::new(1);
///
/// assert_eq!(p.push(10), Ok(()));
/// assert_eq!(p.push(20), Err(PushError(20)));
/// ```
pub fn push(&self, value: T) -> Result<(), PushError<T>> {
let mut head = self.head.get();
let mut tail = self.tail.get();
// Check if the queue is *possibly* full.
if self.inner.distance(head, tail) == self.inner.cap {
// We need to refresh the head and check again if the queue is *really* full.
head = self.inner.head.load(Ordering::Acquire);
self.head.set(head);
// Is the queue *really* full?
if self.inner.distance(head, tail) == self.inner.cap {
return Err(PushError(value));
}
}
// Write the value into the tail slot.
unsafe {
self.inner.slot(tail).write(value);
}
// Move the tail one slot forward.
tail = self.inner.increment(tail);
self.inner.tail.store(tail, Ordering::Release);
self.tail.set(tail);
Ok(())
}
/// Returns the capacity of the queue.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new::<i32>(100);
///
/// assert_eq!(p.capacity(), 100);
/// ```
pub fn capacity(&self) -> usize {
self.inner.cap
}
/// Returns `true` if the queue is empty.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new(100);
///
/// assert!(p.is_empty());
/// p.push(1).unwrap();
/// assert!(!p.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Returns `true` if the queue is full.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new(1);
///
/// assert!(!p.is_full());
/// p.push(1).unwrap();
/// assert!(p.is_full());
/// ```
pub fn is_full(&self) -> bool {
self.len() == self.inner.cap
}
/// Returns the number of elements in the queue.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new(100);
/// assert_eq!(p.len(), 0);
///
/// p.push(10).unwrap();
/// assert_eq!(p.len(), 1);
///
/// p.push(20).unwrap();
/// assert_eq!(p.len(), 2);
/// ```
pub fn len(&self) -> usize {
let head = self.inner.head.load(Ordering::Acquire);
let tail = self.tail.get();
self.head.set(head);
self.inner.distance(head, tail)
}
}
impl<T> fmt::Debug for Producer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Producer { .. }")
}
}
/// The consumer side of a bounded single-producer single-consumer queue.
///
/// # Examples
///
/// ```
/// use rustasim::{spsc, PopError};
///
/// let (p, c) = spsc::new(1);
/// assert_eq!(p.push(10), Ok(()));
///
/// assert_eq!(c.pop(), Ok(10));
/// assert_eq!(c.pop(), Err(PopError));
///
/// assert!(c.is_empty());
/// assert!(!c.is_full());
/// ```
pub struct Consumer<T> {
/// The inner representation of the queue.
inner: Arc<Inner<T>>,
/// A copy of `inner.head` for quick access.
///
/// This value is always in sync with `inner.head`.
head: Cell<usize>,
/// A copy of `inner.tail` for quick access.
///
/// This value can be stale and sometimes needs to be resynchronized with `inner.tail`.
tail: Cell<usize>,
}
unsafe impl<T: Send> Send for Consumer<T> {}
impl<T> Consumer<T> {
/// Attempts to pop an element from the queue.
///
/// If the queue is empty, an error is returned.
///
/// # Examples
///
/// ```
/// use rustasim::{spsc, PopError};
///
/// let (p, c) = spsc::new(1);
/// assert_eq!(p.push(10), Ok(()));
///
/// assert_eq!(c.pop(), Ok(10));
/// assert_eq!(c.pop(), Err(PopError));
/// ```
pub fn pop(&self) -> Result<T, PopError> {
let mut head = self.head.get();
let mut tail = self.tail.get();
// Check if the queue is *possibly* empty.
if head == tail {
// We need to refresh the tail and check again if the queue is *really* empty.
tail = self.inner.tail.load(Ordering::Acquire);
self.tail.set(tail);
// Is the queue *really* empty?
if head == tail {
return Err(PopError);
}
}
// Read the value from the head slot.
let value = unsafe { self.inner.slot(head).read() };
// Move the head one slot forward.
head = self.inner.increment(head);
self.inner.head.store(head, Ordering::Release);
self.head.set(head);
Ok(value)
}
/// Returns the capacity of the queue.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new::<i32>(100);
///
/// assert_eq!(c.capacity(), 100);
/// ```
pub fn capacity(&self) -> usize {
self.inner.cap
}
/// Returns `true` if the queue is empty.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new(100);
///
/// assert!(c.is_empty());
/// p.push(1).unwrap();
/// assert!(!c.is_empty());
/// ```
pub fn is_empty(&self) -> bool {
!(self.head != self.tail || self.len() != 0)
}
/// Returns `true` if the queue is full.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new(1);
///
/// assert!(!c.is_full());
/// p.push(1).unwrap();
/// assert!(c.is_full());
/// ```
pub fn is_full(&self) -> bool {
self.len() == self.inner.cap
}
/// waits until queue can be popped
pub fn wait(&self) {
let head = self.head.get();
let mut tail = self.inner.tail.load(Ordering::Acquire);
// we're good, stop
if head != tail {
self.tail.set(tail);
return;
}
// we need to wait
loop {
//thread::yield_now();
tail = self.inner.tail.load(Ordering::Acquire);
if head != tail {
break;
}
}
self.tail.set(tail);
}
/// Returns the number of elements in the queue.
///
/// # Examples
///
/// ```
/// use rustasim::spsc;
///
/// let (p, c) = spsc::new(100);
/// assert_eq!(c.len(), 0);
///
/// p.push(10).unwrap();
/// assert_eq!(c.len(), 1);
///
/// p.push(20).unwrap();
/// assert_eq!(c.len(), 2);
/// ```
pub fn len(&self) -> usize {
let head = self.head.get();
let tail = self.inner.tail.load(Ordering::Acquire);
self.tail.set(tail);
self.inner.distance(head, tail)
}
}
impl<T> fmt::Debug for Consumer<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Consumer { .. }")
}
}