#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![warn(missing_docs)]
use core::{cmp, fmt, mem::MaybeUninit, ops, ptr};
#[macro_use]
mod macros;
mod loom;
pub mod mpsc;
pub mod recycling;
mod util;
mod wait;
pub use self::recycling::Recycle;
feature! {
#![all(feature = "static", not(all(loom, test)))]
mod static_thingbuf;
pub use self::static_thingbuf::StaticThingBuf;
}
feature! {
#![feature = "alloc"]
extern crate alloc;
mod thingbuf;
pub use self::thingbuf::ThingBuf;
}
use crate::{
loom::{
atomic::{AtomicUsize, Ordering::*},
cell::{MutPtr, UnsafeCell},
},
mpsc::errors::{TryRecvError, TrySendError},
util::{Backoff, CachePadded},
};
const HAS_READER: usize = 1 << (usize::BITS - 1);
pub const MAX_CAPACITY: usize = usize::MAX & !HAS_READER;
pub struct Ref<'slot, T> {
ptr: MutPtr<MaybeUninit<T>>,
slot: &'slot Slot<T>,
new_state: usize,
is_pop: bool,
}
#[derive(PartialEq, Eq)]
pub struct Full<T = ()>(T);
#[derive(Debug)]
struct Core {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
gen: usize,
gen_mask: usize,
idx_mask: usize,
closed: usize,
capacity: usize,
has_dropped_slots: bool,
}
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicUsize,
}
impl Core {
#[cfg(not(all(loom, test)))]
const fn new(capacity: usize) -> Self {
assert!(capacity <= MAX_CAPACITY);
let closed = (capacity + 1).next_power_of_two();
let idx_mask = closed - 1;
let gen = closed << 1;
let gen_mask = !(closed | idx_mask);
Self {
head: CachePadded(AtomicUsize::new(0)),
tail: CachePadded(AtomicUsize::new(0)),
gen,
gen_mask,
closed,
idx_mask,
capacity,
has_dropped_slots: false,
}
}
#[cfg(all(loom, test))]
fn new(capacity: usize) -> Self {
let closed = (capacity + 1).next_power_of_two();
let idx_mask = closed - 1;
let gen = closed << 1;
let gen_mask = !(closed | idx_mask);
Self {
head: CachePadded(AtomicUsize::new(0)),
tail: CachePadded(AtomicUsize::new(0)),
gen,
closed,
gen_mask,
idx_mask,
capacity,
#[cfg(debug_assertions)]
has_dropped_slots: false,
}
}
#[inline(always)]
fn idx_gen(&self, val: usize) -> (usize, usize) {
(val & self.idx_mask, val & self.gen_mask)
}
#[inline]
fn next(&self, idx: usize, gen: usize) -> usize {
if idx + 1 < self.capacity() {
(idx | gen) + 1
} else {
wrapping_add(gen, self.gen)
}
}
#[inline]
fn capacity(&self) -> usize {
self.capacity
}
fn close(&self) -> bool {
test_println!("Core::close");
if crate::util::panic::panicking() {
return false;
}
test_dbg!(self.tail.fetch_or(self.closed, SeqCst) & self.closed == 0)
}
#[inline(always)]
fn push_ref<'slots, T, R>(
&self,
slots: &'slots [Slot<T>],
recycle: &R,
) -> Result<Ref<'slots, T>, TrySendError<()>>
where
R: Recycle<T>,
{
test_println!("push_ref");
let mut backoff = Backoff::new();
let mut tail = test_dbg!(self.tail.load(Relaxed));
loop {
if test_dbg!(tail & self.closed != 0) {
return Err(TrySendError::Closed(()));
}
let (idx, gen) = self.idx_gen(tail);
test_dbg!(idx);
test_dbg!(gen);
let slot = unsafe {
debug_assert!(
idx < slots.len(),
"index out of bounds (index was {} but the length was {})\n\n\
/!\\ EXTREMELY SERIOUS WARNING /!\\: in release mode, this \
access would not have been bounds checked, resulting in \
undefined behavior!\nthis is a bug in `thingbuf`! please \
report an issue immediately!",
idx,
slots.len()
);
slots.get_unchecked(idx)
};
let raw_state = test_dbg!(slot.state.load(SeqCst));
let state = test_dbg!(clear_has_reader(raw_state));
if test_dbg!(state == tail) {
let next_tail = self.next(idx, gen);
match test_dbg!(self
.tail
.compare_exchange_weak(tail, next_tail, SeqCst, Acquire))
{
Ok(_) if test_dbg!(check_has_reader(raw_state)) => {
test_println!(
"advanced tail {} to {}; has an active reader, skipping slot [{}]",
tail,
next_tail,
idx
);
let next_state = wrapping_add(tail, self.gen);
test_dbg!(slot
.state
.fetch_update(SeqCst, SeqCst, |state| {
Some(state & HAS_READER | next_state)
})
.unwrap_or_else(|_| unreachable!()));
tail = next_tail;
backoff.spin();
continue;
}
Ok(_) => {
test_println!(
"advanced tail {} to {}; claimed slot [{}]",
tail,
next_tail,
idx
);
let ptr = slot.value.get_mut();
unsafe {
let ptr = ptr.deref();
if gen == 0 {
ptr.write(recycle.new_element());
test_println!("-> initialized");
} else {
recycle.recycle(ptr.assume_init_mut());
test_println!("-> recycled");
}
}
return Ok(Ref {
ptr,
new_state: tail + 1,
slot,
is_pop: false,
});
}
Err(actual) => {
test_println!("failed to advance tail {} to {}", tail, next_tail);
tail = actual;
backoff.spin();
continue;
}
}
}
let head = test_dbg!(self.head.fetch_or(0, SeqCst));
if test_dbg!(wrapping_add(head, self.gen) == tail) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}
let (tail_idx, tail_gen) = self.idx_gen(tail);
let (state_idx, state_gen) = self.idx_gen(state);
if test_dbg!(state_idx == tail_idx + 1) && test_dbg!(state_gen < tail_gen) {
test_println!("channel full");
return Err(TrySendError::Full(()));
}
backoff.spin_yield();
tail = test_dbg!(self.tail.load(Acquire));
}
}
#[inline(always)]
fn pop_ref<'slots, T>(&self, slots: &'slots [Slot<T>]) -> Result<Ref<'slots, T>, TryRecvError> {
test_println!("pop_ref");
let mut backoff = Backoff::new();
let mut head = self.head.load(Relaxed);
loop {
test_dbg!(head);
let (idx, gen) = self.idx_gen(head);
test_dbg!(idx);
test_dbg!(gen);
let slot = unsafe {
debug_assert!(
idx < slots.len(),
"index out of bounds (index was {} but the length was {})\n\n\
/!\\ EXTREMELY SERIOUS WARNING /!\\: in release mode, this \
access would not have been bounds checked, resulting in \
undefined behavior!\nthis is a bug in `thingbuf`! please \
report an issue immediately!",
idx,
slots.len()
);
slots.get_unchecked(idx)
};
let raw_state = test_dbg!(slot.state.load(Acquire));
let next_head = self.next(idx, gen);
if test_dbg!(raw_state == head + 1) {
match test_dbg!(self
.head
.compare_exchange_weak(head, next_head, SeqCst, Acquire))
{
Ok(_) => {
test_println!("advanced head {} to {}", head, next_head);
test_println!("claimed slot [{}]", idx);
let mut new_state = wrapping_add(head, self.gen);
new_state = set_has_reader(new_state);
test_dbg!(slot.state.store(test_dbg!(new_state), SeqCst));
return Ok(Ref {
new_state,
ptr: slot.value.get_mut(),
slot,
is_pop: true,
});
}
Err(actual) => {
test_println!("failed to advance head, head={}, actual={}", head, actual);
head = actual;
backoff.spin();
continue;
}
}
} else {
let tail = test_dbg!(self.tail.fetch_or(0, SeqCst));
if test_dbg!(tail & !self.closed == head) {
return if test_dbg!(tail & self.closed != 0) {
Err(TryRecvError::Closed)
} else {
test_println!("--> channel empty!");
Err(TryRecvError::Empty)
};
}
if test_dbg!(raw_state == head) {
if test_dbg!(backoff.done_spinning()) {
return Err(TryRecvError::Empty);
}
backoff.spin();
continue;
}
match test_dbg!(self.head.compare_exchange(head, next_head, SeqCst, Acquire)) {
Ok(_) => {
test_println!("skipped head slot [{}], new head={}", idx, next_head);
head = next_head;
}
Err(actual) => {
test_println!(
"failed to skip head slot [{}], head={}, actual={}",
idx,
head,
actual
);
head = actual;
backoff.spin();
}
}
}
}
}
fn len(&self) -> usize {
loop {
let tail = self.tail.load(SeqCst);
let head = self.head.load(SeqCst);
if self.tail.load(SeqCst) == tail {
let (head_idx, _) = self.idx_gen(head);
let (tail_idx, _) = self.idx_gen(tail);
return match head_idx.cmp(&tail_idx) {
cmp::Ordering::Less => tail_idx - head_idx,
cmp::Ordering::Greater => self.capacity - head_idx + tail_idx,
_ if (tail & !self.closed) == (head & !self.closed) => 0,
_ => self.capacity,
};
}
}
}
fn drop_slots<T>(&mut self, slots: &mut [Slot<T>]) {
debug_assert!(
!self.has_dropped_slots,
"tried to drop slots twice! core={:#?}",
self
);
if self.has_dropped_slots {
return;
}
let tail = self.tail.load(SeqCst);
let (idx, gen) = self.idx_gen(tail);
let num_initialized = if gen > 0 { self.capacity() } else { idx };
for slot in &mut slots[..num_initialized] {
unsafe {
slot.value
.with_mut(|value| ptr::drop_in_place((*value).as_mut_ptr()));
}
}
self.has_dropped_slots = true;
}
}
#[inline]
fn check_has_reader(state: usize) -> bool {
state & HAS_READER == HAS_READER
}
#[inline]
fn set_has_reader(state: usize) -> usize {
state | HAS_READER
}
#[inline]
fn clear_has_reader(state: usize) -> usize {
state & !HAS_READER
}
#[inline]
fn wrapping_add(a: usize, b: usize) -> usize {
(a + b) & MAX_CAPACITY
}
impl Drop for Core {
fn drop(&mut self) {
debug_assert!(
self.has_dropped_slots,
"tried to drop Core without dropping slots! core={:#?}",
self
);
}
}
impl<T> Ref<'_, T> {
#[inline]
fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
self.ptr.with(|value| unsafe {
f(&*(*value).as_ptr())
})
}
#[inline]
fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
self.ptr.with(|value| unsafe {
f(&mut *(*value).as_mut_ptr())
})
}
}
impl<T> ops::Deref for Ref<'_, T> {
type Target = T;
#[inline]
fn deref(&self) -> &Self::Target {
unsafe {
&*self.ptr.deref().as_ptr()
}
}
}
impl<T> ops::DerefMut for Ref<'_, T> {
#[inline]
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
&mut *self.ptr.deref().as_mut_ptr()
}
}
}
impl<T> Drop for Ref<'_, T> {
#[inline]
fn drop(&mut self) {
if self.is_pop {
test_println!("drop Ref<{}> (pop)", core::any::type_name::<T>());
test_dbg!(self.slot.state.fetch_and(!HAS_READER, SeqCst));
} else {
test_println!(
"drop Ref<{}> (push), new_state = {}",
core::any::type_name::<T>(),
self.new_state
);
test_dbg!(self.slot.state.store(test_dbg!(self.new_state), Release));
}
}
}
impl<T: fmt::Debug> fmt::Debug for Ref<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.with(|val| fmt::Debug::fmt(val, f))
}
}
impl<T: fmt::Display> fmt::Display for Ref<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.with(|val| fmt::Display::fmt(val, f))
}
}
impl<T: fmt::Write> fmt::Write for Ref<'_, T> {
#[inline]
fn write_str(&mut self, s: &str) -> fmt::Result {
self.with_mut(|val| val.write_str(s))
}
#[inline]
fn write_char(&mut self, c: char) -> fmt::Result {
self.with_mut(|val| val.write_char(c))
}
#[inline]
fn write_fmt(&mut self, f: fmt::Arguments<'_>) -> fmt::Result {
self.with_mut(|val| val.write_fmt(f))
}
}
unsafe impl<T: Send> Send for Ref<'_, T> {}
unsafe impl<T: Send> Sync for Ref<'_, T> {}
impl<T> Slot<T> {
#[cfg(feature = "alloc")]
pub(crate) fn make_boxed_array(capacity: usize) -> alloc::boxed::Box<[Self]> {
(0..capacity).map(|i| Slot::new(i)).collect()
}
feature! {
#![all(feature = "static", not(all(loom, test)))]
const EMPTY: Self = Self::new(usize::MAX);
pub(crate) const fn make_static_array<const CAPACITY: usize>() -> [Self; CAPACITY] {
let mut array = [Self::EMPTY; CAPACITY];
let mut i = 0;
while i < CAPACITY {
array[i] = Self::new(i);
i += 1;
}
array
}
}
#[cfg(not(all(loom, test)))]
const fn new(idx: usize) -> Self {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(idx),
}
}
#[cfg(all(loom, test))]
fn new(idx: usize) -> Self {
Self {
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(idx),
}
}
}
unsafe impl<T: Sync> Sync for Slot<T> {}
impl<T> Full<T> {
pub fn into_inner(self) -> T {
self.0
}
}
impl<T> fmt::Debug for Full<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Full(..)")
}
}
impl<T> fmt::Display for Full<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("queue at capacity")
}
}
#[cfg(feature = "std")]
impl<T> std::error::Error for Full<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn zero_len() {
const CAP: usize = 16;
let mut core = Core::new(CAP);
assert_eq!(core.len(), 0);
assert_eq!(core.capacity(), CAP);
core.has_dropped_slots = true;
}
#[test]
fn closed_channel_len() {
const CAP: usize = 16;
let mut core = Core::new(CAP);
core.close();
assert_eq!(core.len(), 0);
assert_eq!(core.capacity(), CAP);
core.has_dropped_slots = true;
}
#[test]
fn full_simple() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));
core.has_dropped_slots = true;
}
#[test]
fn full_read_and_write() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));
core.has_dropped_slots = true;
}
#[test]
fn full_with_skip() {
const CAP: usize = 3;
let mut core = Core::new(CAP);
let slots: Box<[Slot<usize>]> = Slot::<usize>::make_boxed_array(CAP);
let recycle = recycling::DefaultRecycle::new();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
let _hold = core.pop_ref(&slots).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.push_ref(&slots, &recycle).unwrap();
core.pop_ref(&slots).unwrap();
core.push_ref(&slots, &recycle).unwrap();
assert!(matches!(
core.push_ref(&slots, &recycle),
Err(TrySendError::Full(()))
));
core.has_dropped_slots = true;
}
}