use std::cell::UnsafeCell;
use std::ptr;
use std::sync::atomic::{AtomicPtr, Ordering};
struct Node<T> {
value: UnsafeCell<Option<T>>,
next: AtomicPtr<Node<T>>,
}
pub enum PopResult<T> {
Some(T),
Empty,
Inconsistent,
}
pub struct MpscQueue<T> {
head: AtomicPtr<Node<T>>,
tail: UnsafeCell<*mut Node<T>>,
stub: Box<Node<T>>,
}
unsafe impl<T: Send> Sync for MpscQueue<T> {}
unsafe impl<T: Send> Send for MpscQueue<T> {}
impl<T> MpscQueue<T> {
pub fn new() -> Self {
let stub = Box::new(Node {
value: UnsafeCell::new(None),
next: AtomicPtr::new(ptr::null_mut()),
});
let stub_ptr = stub.as_ref() as *const Node<T> as *mut Node<T>;
Self {
head: AtomicPtr::new(stub_ptr),
tail: UnsafeCell::new(stub_ptr),
stub,
}
}
pub fn push(&self, value: T) {
let node = Box::into_raw(Box::new(Node {
value: UnsafeCell::new(Some(value)),
next: AtomicPtr::new(ptr::null_mut()),
}));
let prev = self.head.swap(node, Ordering::AcqRel);
unsafe { (*prev).next.store(node, Ordering::Release) };
}
pub fn try_pop(&mut self) -> PopResult<T> {
let tail = unsafe { *self.tail.get() };
let next = unsafe { (*tail).next.load(Ordering::Acquire) };
let stub_ptr = self.stub.as_ref() as *const Node<T> as *mut Node<T>;
if tail == stub_ptr {
if next.is_null() {
if self.head.load(Ordering::Acquire) == stub_ptr {
return PopResult::Empty;
}
return PopResult::Inconsistent;
}
unsafe { *self.tail.get() = next };
let value = unsafe { (*next).value.get().replace(None) };
return match value {
Some(v) => PopResult::Some(v),
None => PopResult::Inconsistent,
};
}
if !next.is_null() {
unsafe { *self.tail.get() = next };
let consumed = unsafe { Box::from_raw(tail) };
drop(consumed);
let value = unsafe { (*next).value.get().replace(None) };
return match value {
Some(v) => PopResult::Some(v),
None => PopResult::Inconsistent,
};
}
if self.head.load(Ordering::Acquire) == tail {
PopResult::Empty
} else {
PopResult::Inconsistent
}
}
}
impl<T> Default for MpscQueue<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Drop for MpscQueue<T> {
fn drop(&mut self) {
while let PopResult::Some(_) = self.try_pop() {}
}
}
#[cfg(feature = "harness")]
pub mod recipe;
#[cfg(any(
feature = "mpmc",
feature = "bounded",
feature = "batch",
feature = "metrics",
feature = "affinity",
))]
pub mod features;
#[cfg(feature = "affinity")]
pub use features::affinity::{AffinityError, set_affinity};
#[cfg(feature = "batch")]
pub use features::batch::BatchMpscQueue;
#[cfg(feature = "bounded")]
pub use features::bounded::BoundedMpscQueue;
#[cfg(feature = "metrics")]
pub use features::metrics::{MetricsMpscQueue, QueueMetricsSnapshot};
#[cfg(feature = "mpmc")]
pub use features::mpmc::MpmcQueue;