#[cfg(feature = "async")]
mod r#async;
#[cfg(feature = "async")]
pub use r#async::{AsyncConsumer, AsyncProducer, async_spsc};
use std::cell::UnsafeCell;
use std::mem::MaybeUninit;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[repr(align(64))]
pub(crate) struct Padded<T>(pub(crate) T);
pub(crate) struct Ring<T> {
pub(crate) buf: Box<[UnsafeCell<MaybeUninit<T>>]>,
pub(crate) mask: usize,
pub(crate) head: Padded<AtomicUsize>,
pub(crate) flush: Padded<AtomicUsize>,
pub(crate) producer_dropped: AtomicBool,
#[cfg(feature = "async")]
pub(crate) consumer_dropped: AtomicBool,
}
unsafe impl<T: Send> Send for Ring<T> {}
unsafe impl<T: Send> Sync for Ring<T> {}
impl<T> Ring<T> {
pub(crate) fn new(capacity: usize) -> Self {
assert!(capacity > 0, "capacity must be > 0");
let cap = capacity.next_power_of_two();
let buf: Vec<UnsafeCell<MaybeUninit<T>>> = (0..cap)
.map(|_| UnsafeCell::new(MaybeUninit::uninit()))
.collect();
Self {
buf: buf.into_boxed_slice(),
mask: cap - 1,
head: Padded(AtomicUsize::new(0)),
flush: Padded(AtomicUsize::new(0)),
producer_dropped: AtomicBool::new(false),
#[cfg(feature = "async")]
consumer_dropped: AtomicBool::new(false),
}
}
pub(crate) fn capacity(&self) -> usize {
self.mask + 1
}
#[inline]
pub(crate) fn push(&self, tail: &mut usize, cached_head: &mut usize, val: T) -> Result<(), T> {
if *tail - *cached_head >= self.capacity() {
*cached_head = self.head.0.load(Ordering::Acquire);
if *tail - *cached_head >= self.capacity() {
return Err(val);
}
}
unsafe {
(*self.buf[*tail & self.mask].get()).write(val);
}
*tail += 1;
Ok(())
}
#[inline]
pub(crate) fn flush_to(&self, tail: usize, cached_head: &mut usize) -> FlushResult {
let prev_flush = self.flush.0.load(Ordering::Relaxed);
if tail == prev_flush {
return FlushResult::NothingToFlush;
}
let count = tail - prev_flush;
*cached_head = self.head.0.load(Ordering::Acquire);
let was_empty = prev_flush == *cached_head;
self.flush.0.store(tail, Ordering::Release);
FlushResult::Flushed { count, was_empty }
}
#[inline]
pub(crate) fn is_full(&self, tail: usize, cached_head: &mut usize) -> bool {
if tail - *cached_head >= self.capacity() {
*cached_head = self.head.0.load(Ordering::Acquire);
tail - *cached_head >= self.capacity()
} else {
false
}
}
#[inline]
pub(crate) fn producer_len(&self, tail: usize) -> usize {
tail.wrapping_sub(self.head.0.load(Ordering::Acquire))
}
#[inline]
pub(crate) fn producer_is_empty(&self, tail: usize) -> bool {
tail == self.head.0.load(Ordering::Acquire)
}
#[inline]
pub(crate) fn pop(&self, head: &mut usize, cached_flush: usize) -> Option<T> {
if *head == cached_flush {
return None;
}
let val = unsafe { (*self.buf[*head & self.mask].get()).assume_init_read() };
*head += 1;
Some(val)
}
#[inline]
pub(crate) fn release(&self, head: usize) {
self.head.0.store(head, Ordering::Release);
}
#[inline]
pub(crate) fn prefetch(&self, cached_flush: &mut usize) -> usize {
let new_flush = self.flush.0.load(Ordering::Acquire);
let count = new_flush.wrapping_sub(*cached_flush);
*cached_flush = new_flush;
count
}
#[inline]
pub(crate) fn consumer_is_empty(&self, head: usize, cached_flush: usize) -> bool {
head == cached_flush && self.flush.0.load(Ordering::Acquire) == head
}
#[inline]
pub(crate) fn consumer_len(&self, head: usize) -> usize {
self.flush.0.load(Ordering::Acquire).wrapping_sub(head)
}
pub(crate) fn drop_remaining(&mut self) {
let head = *self.head.0.get_mut();
let flush = *self.flush.0.get_mut();
for i in head..flush {
unsafe {
self.buf[i & self.mask].get_mut().assume_init_drop();
}
}
}
}
impl<T> Drop for Ring<T> {
fn drop(&mut self) {
self.drop_remaining();
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushResult {
Flushed { count: usize, was_empty: bool },
NothingToFlush,
}
pub struct Producer<T> {
ring: Arc<Ring<T>>,
tail: usize,
cached_head: usize,
}
impl<T> Producer<T> {
pub fn ring_addr(&self) -> usize {
Arc::as_ptr(&self.ring) as usize
}
}
impl<T> Drop for Producer<T> {
fn drop(&mut self) {
self.ring.flush.0.store(self.tail, Ordering::Release);
self.ring.producer_dropped.store(true, Ordering::Release);
}
}
unsafe impl<T: Send> Send for Producer<T> {}
pub struct Consumer<T> {
ring: Arc<Ring<T>>,
head: usize,
cached_flush: usize,
}
impl<T> Consumer<T> {
pub fn ring_addr(&self) -> usize {
Arc::as_ptr(&self.ring) as usize
}
}
unsafe impl<T: Send> Send for Consumer<T> {}
pub fn spsc<T>(capacity: usize) -> (Producer<T>, Consumer<T>) {
let ring = Arc::new(Ring::new(capacity));
(
Producer {
ring: ring.clone(),
tail: 0,
cached_head: 0,
},
Consumer {
ring,
head: 0,
cached_flush: 0,
},
)
}
impl<T> Producer<T> {
#[inline]
pub fn push(&mut self, val: T) -> Result<(), T> {
self.ring.push(&mut self.tail, &mut self.cached_head, val)
}
#[inline]
pub fn flush(&mut self) {
self.ring.flush.0.store(self.tail, Ordering::Release);
}
#[inline]
pub fn flush_and_check(&mut self) -> FlushResult {
self.ring.flush_to(self.tail, &mut self.cached_head)
}
#[inline]
pub fn push_and_flush(&mut self, val: T) -> Result<(), T> {
self.push(val)?;
self.flush();
Ok(())
}
#[inline]
pub fn is_full(&mut self) -> bool {
self.ring.is_full(self.tail, &mut self.cached_head)
}
#[inline]
pub fn capacity(&self) -> usize {
self.ring.capacity()
}
#[inline]
pub fn len(&self) -> usize {
self.ring.producer_len(self.tail)
}
#[inline]
pub fn is_empty(&self) -> bool {
self.ring.producer_is_empty(self.tail)
}
}
impl<T> Consumer<T> {
#[inline]
pub fn pop(&mut self) -> Option<T> {
self.ring.pop(&mut self.head, self.cached_flush)
}
#[inline]
pub fn release(&mut self) {
self.ring.release(self.head);
}
#[inline]
pub fn prefetch(&mut self) -> usize {
self.ring.prefetch(&mut self.cached_flush)
}
#[inline]
pub fn prefetch_and_pop(&mut self) -> Option<T> {
if self.head == self.cached_flush {
self.prefetch();
}
let val = self.pop();
if val.is_some() {
self.release();
}
val
}
#[inline]
pub fn is_empty(&self) -> bool {
self.ring.consumer_is_empty(self.head, self.cached_flush)
}
#[inline]
pub fn is_disconnected(&self) -> bool {
self.ring.producer_dropped.load(Ordering::Acquire)
&& self.head == self.ring.flush.0.load(Ordering::Acquire)
}
#[inline]
pub fn capacity(&self) -> usize {
self.ring.capacity()
}
#[inline]
pub fn len(&self) -> usize {
self.ring.consumer_len(self.head)
}
}
impl<T> Drop for Consumer<T> {
fn drop(&mut self) {
self.release();
}
}
impl<T> std::fmt::Debug for Producer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Producer")
.field("capacity", &self.capacity())
.finish_non_exhaustive()
}
}
impl<T> std::fmt::Debug for Consumer<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Consumer")
.field("capacity", &self.capacity())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn push_pop_basic() {
let (mut p, mut c) = spsc::<u32>(4);
assert!(c.prefetch_and_pop().is_none());
p.push(1).unwrap();
p.push(2).unwrap();
assert!(c.prefetch_and_pop().is_none());
p.flush();
assert_eq!(c.prefetch_and_pop(), Some(1));
assert_eq!(c.prefetch_and_pop(), Some(2));
assert!(c.prefetch_and_pop().is_none());
}
#[test]
fn push_and_flush() {
let (mut p, mut c) = spsc::<u32>(4);
p.push_and_flush(42).unwrap();
assert_eq!(c.prefetch_and_pop(), Some(42));
}
#[test]
fn batch_prefetch() {
let (mut p, mut c) = spsc::<u32>(8);
for i in 0..5 {
p.push(i).unwrap();
}
assert_eq!(c.prefetch(), 0); p.flush();
assert_eq!(c.prefetch(), 5);
for i in 0..5 {
assert_eq!(c.pop(), Some(i));
}
assert!(c.pop().is_none());
}
#[test]
fn flush_and_check_reports_was_empty() {
let (mut p, mut c) = spsc::<u32>(4);
p.push(1).unwrap();
let r = p.flush_and_check();
assert_eq!(
r,
FlushResult::Flushed {
count: 1,
was_empty: true
}
);
p.push(2).unwrap();
let r = p.flush_and_check();
assert!(matches!(r, FlushResult::Flushed { count: 1, .. }));
c.prefetch_and_pop();
c.prefetch_and_pop();
p.push(3).unwrap();
p.push(4).unwrap();
let _ = p.push(5);
let _ = p.push(6);
let r = p.flush_and_check();
assert!(matches!(r, FlushResult::Flushed { .. }));
}
#[test]
fn full_ring() {
let (mut p, mut c) = spsc::<u32>(4);
for i in 0..4 {
p.push(i).unwrap();
}
assert!(p.push(99).is_err());
p.flush();
assert_eq!(c.prefetch_and_pop(), Some(0));
p.push(99).unwrap();
p.flush();
for i in 1..=4 {
let expected = if i < 4 { i } else { 99 };
assert_eq!(c.prefetch_and_pop(), Some(expected));
}
}
#[test]
fn wraps_around() {
let (mut p, mut c) = spsc::<u32>(2);
for round in 0..100 {
p.push(round * 2).unwrap();
p.push(round * 2 + 1).unwrap();
p.flush();
assert_eq!(c.prefetch_and_pop(), Some(round * 2));
assert_eq!(c.prefetch_and_pop(), Some(round * 2 + 1));
}
}
#[test]
fn capacity_rounds_up() {
let (p, _c) = spsc::<u8>(3);
assert_eq!(p.capacity(), 4);
let (p, _c) = spsc::<u8>(5);
assert_eq!(p.capacity(), 8);
let (p, _c) = spsc::<u8>(1);
assert_eq!(p.capacity(), 1);
}
#[test]
fn drop_remaining() {
use std::sync::atomic::AtomicUsize;
static DROPS: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug)]
struct Counted;
impl Drop for Counted {
fn drop(&mut self) {
DROPS.fetch_add(1, Ordering::Relaxed);
}
}
DROPS.store(0, Ordering::Relaxed);
let (mut p, c) = spsc::<Counted>(4);
p.push(Counted).unwrap();
p.push(Counted).unwrap();
p.push(Counted).unwrap();
p.flush();
drop(p);
drop(c);
assert_eq!(DROPS.load(Ordering::Relaxed), 3);
}
#[test]
fn cross_thread() {
let (mut p, mut c) = spsc::<u64>(1024);
let n = 100_000u64;
let sender = std::thread::spawn(move || {
for i in 0..n {
while p.push(i).is_err() {
p.flush();
std::thread::yield_now();
}
p.flush();
}
});
let mut received = 0u64;
while received < n {
if c.prefetch() > 0 {
while let Some(v) = c.pop() {
assert_eq!(v, received);
received += 1;
}
c.release();
} else {
std::thread::yield_now();
}
}
sender.join().unwrap();
}
#[test]
fn flush_and_check_was_empty_after_consumer_drains() {
let (mut p, mut c) = spsc::<u32>(4);
for i in 0..5 {
p.push_and_flush(i).unwrap();
let val = c.prefetch_and_pop().unwrap();
assert_eq!(val, i);
}
p.push(99).unwrap();
let r = p.flush_and_check();
assert_eq!(
r,
FlushResult::Flushed {
count: 1,
was_empty: true,
}
);
}
}