rtrb-basedrop 0.1.2

A fork of the `rtrb` crate that uses basedrop's Shared pointer in place of Arc
Documentation
//! This is a fork of the [`rtrb`] crate that uses [`basedrop`]'s `Shared` pointer in
//! place of `Arc`. This ensures that when all references to the ring buffer are
//! dropped, the underlying `Vec` will never potentially get deallocated (a non-realtime
//! safe operation) in the realtime thread. Instead, all allocations are cleaned up in
//! whatever thread owns the basedrop `Collector` object.
//! 
//! This is especially useful for audio applications.
//! 
//! [`rtrb`]: https://github.com/mgeier/rtrb
//! [`basedrop`]: https://github.com/glowcoil/basedrop
//! 
//! -------------------------------------------------------------
//! 
//! A realtime-safe single-producer single-consumer (SPSC) ring buffer.
//!
//! A [`RingBuffer`] consists of two parts:
//! a [`Producer`] for writing into the ring buffer and
//! a [`Consumer`] for reading from the ring buffer.
//!
//! A fixed-capacity buffer is allocated on construction.
//! After that, no more memory is allocated (unless the type `T` does that internally).
//! Reading from and writing into the ring buffer is *lock-free* and *wait-free*.
//! All reading and writing functions return immediately.
//! Attempts to write to a full buffer return an error;
//! values inside the buffer are *not* overwritten.
//! Attempts to read from an empty buffer return an error as well.
//! Only a single thread can write into the ring buffer and a single thread
//! (typically a different one) can read from the ring buffer.
//! If the queue is empty, there is no way for the reading thread to wait
//! for new data, other than trying repeatedly until reading succeeds.
//! Similarly, if the queue is full, there is no way for the writing thread
//! to wait for newly available space to write to, other than trying repeatedly.
//!
//! # Examples
//!
//! Moving single elements into and out of a queue with
//! [`Producer::push()`] and [`Consumer::pop()`], respectively:
//!
//! ```
//! use rtrb_basedrop::{RingBuffer, PushError, PopError};
//! use basedrop::Collector;
//!
//! let collector = Collector::new();
//!
//! let (mut producer, mut consumer) = RingBuffer::new(2, &collector.handle());
//!
//! assert_eq!(producer.push(10), Ok(()));
//! assert_eq!(producer.push(20), Ok(()));
//! assert_eq!(producer.push(30), Err(PushError::Full(30)));
//!
//! std::thread::spawn(move || {
//!     assert_eq!(consumer.pop(), Ok(10));
//!     assert_eq!(consumer.pop(), Ok(20));
//!     assert_eq!(consumer.pop(), Err(PopError::Empty));
//! }).join().unwrap();
//! ```
//!
//! See the documentation of the [`chunks#examples`] module
//! for examples that write multiple items at once with
//! [`Producer::write_chunk_uninit()`] and [`Producer::write_chunk()`]
//! and read multiple items with [`Consumer::read_chunk()`].

#![cfg_attr(not(feature = "std"), no_std)]
#![warn(rust_2018_idioms)]
#![deny(missing_docs, missing_debug_implementations)]

extern crate alloc;

use alloc::vec::Vec;
use basedrop::{Handle, Shared};
use core::cell::Cell;
use core::fmt;
use core::marker::PhantomData;
use core::mem::{ManuallyDrop, MaybeUninit};
use core::sync::atomic::{AtomicUsize, Ordering};
use std::fmt::Debug;

use cache_padded::CachePadded;

pub mod chunks;

// This is used in the documentation.
#[allow(unused_imports)]
use chunks::WriteChunkUninit;

/// A bounded single-producer single-consumer (SPSC) queue.
///
/// Elements can be written with a [`Producer`] and read with a [`Consumer`],
/// both of which can be obtained with [`RingBuffer::new()`].
///
/// *See also the [crate-level documentation](crate).*
pub struct RingBuffer<T: Send + 'static> {
    /// The head of the queue.
    ///
    /// This integer is in range `0 .. 2 * capacity`.
    head: CachePadded<AtomicUsize>,

    /// The tail of the queue.
    ///
    /// This integer is in range `0 .. 2 * capacity`.
    tail: CachePadded<AtomicUsize>,

    /// The buffer holding slots.
    data_ptr: *mut T,

    /// The queue capacity.
    capacity: usize,

    /// Indicates that dropping a `RingBuffer<T>` may drop elements of type `T`.
    _marker: PhantomData<T>,
}

impl<T: Send + 'static> Debug for RingBuffer<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut f = f.debug_struct("RingBuffer");
        f.field("head", &self.head);
        f.field("tail", &self.tail);
        f.field("capacity", &self.capacity);
        f.finish()
    }
}

unsafe impl<T: Send + 'static> Send for RingBuffer<T> {}

impl<T: Send + 'static> RingBuffer<T> {
    /// Creates a `RingBuffer` with the given `capacity` and returns [`Producer`] and [`Consumer`].
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (producer, consumer) = RingBuffer::<f32>::new(100, &collector.handle());
    /// ```
    ///
    /// Specifying an explicit type with the [turbofish](https://turbo.fish/)
    /// is is only necessary if it cannot be deduced by the compiler.
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (mut producer, consumer) = RingBuffer::new(100, &collector.handle());
    /// assert_eq!(producer.push(0.0f32), Ok(()));
    /// ```
    #[allow(clippy::new_ret_no_self)]
    #[must_use]
    pub fn new(capacity: usize, handle: &Handle) -> (Producer<T>, Consumer<T>) {
        let buffer = Shared::new(
            handle,
            RingBuffer {
                head: CachePadded::new(AtomicUsize::new(0)),
                tail: CachePadded::new(AtomicUsize::new(0)),
                data_ptr: ManuallyDrop::new(Vec::with_capacity(capacity)).as_mut_ptr(),
                capacity,
                _marker: PhantomData,
            },
        );
        let p = Producer {
            buffer: buffer.clone(),
            cached_head: Cell::new(0),
            cached_tail: Cell::new(0),
        };
        let c = Consumer {
            buffer,
            cached_head: Cell::new(0),
            cached_tail: Cell::new(0),
        };
        (p, c)
    }

    /// Returns the capacity of the queue.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (producer, consumer) = RingBuffer::<f32>::new(100, &collector.handle());
    /// assert_eq!(producer.buffer().capacity(), 100);
    /// assert_eq!(consumer.buffer().capacity(), 100);
    /// // Both producer and consumer of course refer to the same ring buffer:
    /// assert_eq!(producer.buffer(), consumer.buffer());
    /// ```
    pub fn capacity(&self) -> usize {
        self.capacity
    }

    /// Wraps a position from the range `0 .. 2 * capacity` to `0 .. capacity`.
    fn collapse_position(&self, pos: usize) -> usize {
        debug_assert!(pos == 0 || pos < 2 * self.capacity);
        if pos < self.capacity {
            pos
        } else {
            pos - self.capacity
        }
    }

    /// Returns a pointer to the slot at position `pos`.
    ///
    /// If `pos == 0 && capacity == 0`, the returned pointer must not be dereferenced!
    unsafe fn slot_ptr(&self, pos: usize) -> *mut T {
        debug_assert!(pos == 0 || pos < 2 * self.capacity);
        self.data_ptr.add(self.collapse_position(pos))
    }

    /// Increments a position by going `n` slots forward.
    fn increment(&self, pos: usize, n: usize) -> usize {
        debug_assert!(pos == 0 || pos < 2 * self.capacity);
        debug_assert!(n <= self.capacity);
        let threshold = 2 * self.capacity - n;
        if pos < threshold {
            pos + n
        } else {
            pos - threshold
        }
    }

    /// Increments a position by going one slot forward.
    ///
    /// This is more efficient than self.increment(..., 1).
    fn increment1(&self, pos: usize) -> usize {
        debug_assert_ne!(self.capacity, 0);
        debug_assert!(pos < 2 * self.capacity);
        if pos < 2 * self.capacity - 1 {
            pos + 1
        } else {
            0
        }
    }

    /// Returns the distance between two positions.
    fn distance(&self, a: usize, b: usize) -> usize {
        debug_assert!(a == 0 || a < 2 * self.capacity);
        debug_assert!(b == 0 || b < 2 * self.capacity);
        if a <= b {
            b - a
        } else {
            2 * self.capacity - a + b
        }
    }
}

impl<T: Send + 'static> Drop for RingBuffer<T> {
    /// Drops all non-empty slots.
    fn drop(&mut self) {
        let mut head = self.head.load(Ordering::Relaxed);
        let tail = self.tail.load(Ordering::Relaxed);

        // Loop over all slots that hold a value and drop them.
        while head != tail {
            unsafe {
                self.slot_ptr(head).drop_in_place();
            }
            head = self.increment1(head);
        }

        // Finally, deallocate the buffer, but don't run any destructors.
        unsafe {
            Vec::from_raw_parts(self.data_ptr, 0, self.capacity);
        }
    }
}

impl<T: Send + 'static> PartialEq for RingBuffer<T> {
    /// This method tests for `self` and `other` values to be equal, and is used by `==`.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (p1, c1) = RingBuffer::<f32>::new(1000, &collector.handle());
    /// assert_eq!(p1.buffer(), c1.buffer());
    ///
    /// let (p2, c2) = RingBuffer::<f32>::new(1000, &collector.handle());
    /// assert_ne!(p1.buffer(), p2.buffer());
    /// ```
    fn eq(&self, other: &Self) -> bool {
        core::ptr::eq(self, other)
    }
}

impl<T: Send + 'static> Eq for RingBuffer<T> {}

/// The producer side of a [`RingBuffer`].
///
/// Can be moved between threads,
/// but references from different threads are not allowed
/// (i.e. it is [`Send`] but not [`Sync`]).
///
/// Can only be created with [`RingBuffer::new()`]
/// (together with its counterpart, the [`Consumer`]).
///
/// Individual elements can be moved into the ring buffer with [`Producer::push()`],
/// multiple elements at once can be written with [`Producer::write_chunk()`]
/// and [`Producer::write_chunk_uninit()`].
///
/// The number of free slots currently available for writing can be obtained with
/// [`Producer::slots()`].
///
/// When the `Producer` is dropped, [`Consumer::is_abandoned()`] will return `true`.
/// This can be used as a crude way to communicate to the receiving thread
/// that no more data will be produced.
/// When the `Producer` is dropped after the [`Consumer`] has already been dropped,
/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
pub struct Producer<T: Send + 'static> {
    /// A reference to the ring buffer.
    buffer: Shared<RingBuffer<T>>,

    /// A copy of `buffer.head` for quick access.
    ///
    /// This value can be stale and sometimes needs to be resynchronized with `buffer.head`.
    cached_head: Cell<usize>,

    /// A copy of `buffer.tail` for quick access.
    ///
    /// This value is always in sync with `buffer.tail`.
    cached_tail: Cell<usize>,
}

impl<T: Send + 'static> Debug for Producer<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut f = f.debug_struct("Producer");
        f.field("buffer", &*self.buffer);
        f.field("cached_head", &self.cached_head);
        f.field("cached_tail", &self.cached_tail);
        f.finish()
    }
}

unsafe impl<T: Send + 'static> Send for Producer<T> {}

impl<T: Send + 'static> Producer<T> {
    /// Attempts to push an element into the queue.
    ///
    /// The element is *moved* into the ring buffer and its slot
    /// is made available to be read by the [`Consumer`].
    ///
    /// # Errors
    ///
    /// If the queue is full, the element is returned back as an error.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::{RingBuffer, PushError};
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (mut p, c) = RingBuffer::new(1, &collector.handle());
    ///
    /// assert_eq!(p.push(10), Ok(()));
    /// assert_eq!(p.push(20), Err(PushError::Full(20)));
    /// ```
    pub fn push(&mut self, value: T) -> Result<(), PushError<T>> {
        if let Some(tail) = self.next_tail() {
            unsafe {
                self.buffer.slot_ptr(tail).write(value);
            }
            let tail = self.buffer.increment1(tail);
            self.buffer.tail.store(tail, Ordering::Release);
            self.cached_tail.set(tail);
            Ok(())
        } else {
            Err(PushError::Full(value))
        }
    }

    /// Returns the number of slots available for writing.
    ///
    /// Since items can be concurrently consumed on another thread, the actual number
    /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
    ///
    /// To check for a single available slot,
    /// using [`Producer::is_full()`] is often quicker
    /// (because it might not have to check an atomic variable).
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (p, c) = RingBuffer::<f32>::new(1024, &collector.handle());
    ///
    /// assert_eq!(p.slots(), 1024);
    /// ```
    pub fn slots(&self) -> usize {
        let head = self.buffer.head.load(Ordering::Acquire);
        self.cached_head.set(head);
        self.buffer.capacity - self.buffer.distance(head, self.cached_tail.get())
    }

    /// Returns `true` if there are currently no slots available for writing.
    ///
    /// A full ring buffer might cease to be full at any time
    /// if the corresponding [`Consumer`] is consuming items in another thread.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
    ///
    /// assert!(!p.is_full());
    /// ```
    ///
    /// Since items can be concurrently consumed on another thread, the ring buffer
    /// might not be full for long:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
    /// if p.is_full() {
    ///     // The buffer might be full, but it might as well not be
    ///     // if an item was just consumed on another thread.
    /// }
    /// ```
    ///
    /// However, if it's not full, another thread cannot change that:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
    /// if !p.is_full() {
    ///     // At least one slot is guaranteed to be available for writing.
    /// }
    /// ```
    pub fn is_full(&self) -> bool {
        self.next_tail().is_none()
    }

    /*
    /// Returns `true` if the corresponding [`Consumer`] has been destroyed.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (mut p, c) = RingBuffer::new(7, &collector.handle());
    /// assert!(!p.is_abandoned());
    /// assert_eq!(p.push(10), Ok(()));
    /// drop(c);
    /// // The items that are still in the ring buffer are not accessible anymore.
    /// assert!(p.is_abandoned());
    /// // Even though it's futile, items can still be written:
    /// assert_eq!(p.push(11), Ok(()));
    /// ```
    ///
    /// Since the consumer can be concurrently dropped on another thread,
    /// the producer might become abandoned at any time:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # let (p, c) = RingBuffer::<i32>::new(1);
    /// if !p.is_abandoned() {
    ///     // Right now, the consumer might still be alive, but it might as well not be
    ///     // if another thread has just dropped it.
    /// }
    /// ```
    ///
    /// However, if it already is abandoned, it will stay that way:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # let (p, c) = RingBuffer::<i32>::new(1);
    /// if p.is_abandoned() {
    ///     // The consumer does definitely not exist anymore.
    /// }
    /// ```
    pub fn is_abandoned(&self) -> bool {
        Shared::strong_count(&self.buffer) < 2
    }
    */

    /// Returns a read-only reference to the ring buffer.
    pub fn buffer(&self) -> &RingBuffer<T> {
        &self.buffer
    }

    /// Get the tail position for writing the next slot, if available.
    ///
    /// This is a strict subset of the functionality implemented in `write_chunk_uninit()`.
    /// For performance, this special case is immplemented separately.
    fn next_tail(&self) -> Option<usize> {
        let tail = self.cached_tail.get();

        // Check if the queue is *possibly* full.
        if self.buffer.distance(self.cached_head.get(), tail) == self.buffer.capacity {
            // Refresh the head ...
            let head = self.buffer.head.load(Ordering::Acquire);
            self.cached_head.set(head);

            // ... and check if it's *really* full.
            if self.buffer.distance(head, tail) == self.buffer.capacity {
                return None;
            }
        }
        Some(tail)
    }
}

/// The consumer side of a [`RingBuffer`].
///
/// Can be moved between threads,
/// but references from different threads are not allowed
/// (i.e. it is [`Send`] but not [`Sync`]).
///
/// Can only be created with [`RingBuffer::new()`]
/// (together with its counterpart, the [`Producer`]).
///
/// Individual elements can be moved out of the ring buffer with [`Consumer::pop()`],
/// multiple elements at once can be read with [`Consumer::read_chunk()`].
///
/// The number of slots currently available for reading can be obtained with
/// [`Consumer::slots()`].
///
/// When the `Consumer` is dropped, [`Producer::is_abandoned()`] will return `true`.
/// This can be used as a crude way to communicate to the sending thread
/// that no more data will be consumed.
/// When the `Consumer` is dropped after the [`Producer`] has already been dropped,
/// [`RingBuffer::drop()`] will be called, freeing the allocated memory.
pub struct Consumer<T: Send + 'static> {
    /// A reference to the ring buffer.
    buffer: Shared<RingBuffer<T>>,

    /// A copy of `buffer.head` for quick access.
    ///
    /// This value is always in sync with `buffer.head`.
    cached_head: Cell<usize>,

    /// A copy of `buffer.tail` for quick access.
    ///
    /// This value can be stale and sometimes needs to be resynchronized with `buffer.tail`.
    cached_tail: Cell<usize>,
}

impl<T: Send + 'static> Debug for Consumer<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut f = f.debug_struct("Consumer");
        f.field("buffer", &*self.buffer);
        f.field("cached_head", &self.cached_head);
        f.field("cached_tail", &self.cached_tail);
        f.finish()
    }
}

unsafe impl<T: Send + 'static> Send for Consumer<T> {}

impl<T: Send + 'static> Consumer<T> {
    /// Attempts to pop an element from the queue.
    ///
    /// The element is *moved* out of the ring buffer and its slot
    /// is made available to be filled by the [`Producer`] again.
    ///
    /// # Errors
    ///
    /// If the queue is empty, an error is returned.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::{PopError, RingBuffer};
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (mut p, mut c) = RingBuffer::new(1, &collector.handle());
    ///
    /// assert_eq!(p.push(10), Ok(()));
    /// assert_eq!(c.pop(), Ok(10));
    /// assert_eq!(c.pop(), Err(PopError::Empty));
    /// ```
    ///
    /// To obtain an [`Option<T>`](Option), use [`.ok()`](Result::ok) on the result.
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (mut p, mut c) = RingBuffer::new(1, &collector.handle());
    /// assert_eq!(p.push(20), Ok(()));
    /// assert_eq!(c.pop().ok(), Some(20));
    /// ```
    pub fn pop(&mut self) -> Result<T, PopError> {
        if let Some(head) = self.next_head() {
            let value = unsafe { self.buffer.slot_ptr(head).read() };
            let head = self.buffer.increment1(head);
            self.buffer.head.store(head, Ordering::Release);
            self.cached_head.set(head);
            Ok(value)
        } else {
            Err(PopError::Empty)
        }
    }

    /// Attempts to read an element from the queue without removing it.
    ///
    /// # Errors
    ///
    /// If the queue is empty, an error is returned.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::{PeekError, RingBuffer};
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (mut p, c) = RingBuffer::new(1, &collector.handle());
    ///
    /// assert_eq!(c.peek(), Err(PeekError::Empty));
    /// assert_eq!(p.push(10), Ok(()));
    /// assert_eq!(c.peek(), Ok(&10));
    /// assert_eq!(c.peek(), Ok(&10));
    /// ```
    pub fn peek(&self) -> Result<&T, PeekError> {
        if let Some(head) = self.next_head() {
            Ok(unsafe { &*self.buffer.slot_ptr(head) })
        } else {
            Err(PeekError::Empty)
        }
    }

    /// Returns the number of slots available for reading.
    ///
    /// Since items can be concurrently produced on another thread, the actual number
    /// of available slots may increase at any time (up to the [`RingBuffer::capacity()`]).
    ///
    /// To check for a single available slot,
    /// using [`Consumer::is_empty()`] is often quicker
    /// (because it might not have to check an atomic variable).
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (p, c) = RingBuffer::<f32>::new(1024, &collector.handle());
    ///
    /// assert_eq!(c.slots(), 0);
    /// ```
    pub fn slots(&self) -> usize {
        let tail = self.buffer.tail.load(Ordering::Acquire);
        self.cached_tail.set(tail);
        self.buffer.distance(self.cached_head.get(), tail)
    }

    /// Returns `true` if there are currently no slots available for reading.
    ///
    /// An empty ring buffer might cease to be empty at any time
    /// if the corresponding [`Producer`] is producing items in another thread.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
    ///
    /// assert!(c.is_empty());
    /// ```
    ///
    /// Since items can be concurrently produced on another thread, the ring buffer
    /// might not be empty for long:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
    /// if c.is_empty() {
    ///     // The buffer might be empty, but it might as well not be
    ///     // if an item was just produced on another thread.
    /// }
    /// ```
    ///
    /// However, if it's not empty, another thread cannot change that:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (p, c) = RingBuffer::<f32>::new(1, &collector.handle());
    /// if !c.is_empty() {
    ///     // At least one slot is guaranteed to be available for reading.
    /// }
    /// ```
    pub fn is_empty(&self) -> bool {
        self.next_head().is_none()
    }

    /*
    /// Returns `true` if the corresponding [`Producer`] has been destroyed.
    ///
    /// # Examples
    ///
    /// ```
    /// use rtrb_basedrop::RingBuffer;
    /// use basedrop::Collector;
    ///
    /// let collector = Collector::new();
    ///
    /// let (mut p, mut c) = RingBuffer::new(7, &collector.handle());
    /// assert!(!c.is_abandoned());
    /// assert_eq!(p.push(10), Ok(()));
    /// drop(p);
    /// assert!(c.is_abandoned());
    /// // The items that are left in the ring buffer can still be consumed:
    /// assert_eq!(c.pop(), Ok(10));
    /// ```
    ///
    /// Since the producer can be concurrently dropped on another thread,
    /// the consumer might become abandoned at any time:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (p, c) = RingBuffer::<i32>::new(1, &collector.handle());
    /// if !c.is_abandoned() {
    ///     // Right now, the producer might still be alive, but it might as well not be
    ///     // if another thread has just dropped it.
    /// }
    /// ```
    ///
    /// However, if it already is abandoned, it will stay that way:
    ///
    /// ```
    /// # use rtrb_basedrop::RingBuffer;
    /// # use basedrop::Collector;
    /// # let collector = Collector::new();
    /// # let (p, c) = RingBuffer::<i32>::new(1, &collector.handle());
    /// if c.is_abandoned() {
    ///     // The producer does definitely not exist anymore.
    /// }
    /// ```
    pub fn is_abandoned(&self) -> bool {
        Shared::strong_count(&self.buffer) < 2
    }
    */

    /// Returns a read-only reference to the ring buffer.
    pub fn buffer(&self) -> &RingBuffer<T> {
        &self.buffer
    }

    /// Get the head position for reading the next slot, if available.
    ///
    /// This is a strict subset of the functionality implemented in `read_chunk()`.
    /// For performance, this special case is immplemented separately.
    fn next_head(&self) -> Option<usize> {
        let head = self.cached_head.get();

        // Check if the queue is *possibly* empty.
        if head == self.cached_tail.get() {
            // Refresh the tail ...
            let tail = self.buffer.tail.load(Ordering::Acquire);
            self.cached_tail.set(tail);

            // ... and check if it's *really* empty.
            if head == tail {
                return None;
            }
        }
        Some(head)
    }
}

/// Extension trait used to provide a [`copy_to_uninit()`](CopyToUninit::copy_to_uninit)
/// method on built-in slices.
///
/// This can be used to safely copy data to the slices returned from
/// [`WriteChunkUninit::as_mut_slices()`].
///
/// To use this, the trait has to be brought into scope, e.g. with:
///
/// ```
/// use rtrb_basedrop::CopyToUninit;
/// ```
pub trait CopyToUninit<T: Copy + Send + 'static> {
    /// Copies contents to a possibly uninitialized slice.
    fn copy_to_uninit(&self, dst: &mut [MaybeUninit<T>]) -> &mut [T];
}

impl<T: Copy + Send + 'static> CopyToUninit<T> for [T] {
    /// Copies contents to a possibly uninitialized slice.
    ///
    /// # Panics
    ///
    /// This function will panic if the two slices have different lengths.
    fn copy_to_uninit(&self, dst: &mut [MaybeUninit<T>]) -> &mut [T] {
        assert_eq!(
            self.len(),
            dst.len(),
            "source slice length does not match destination slice length"
        );
        let dst_ptr = dst.as_mut_ptr() as *mut _;
        unsafe {
            self.as_ptr().copy_to_nonoverlapping(dst_ptr, self.len());
            core::slice::from_raw_parts_mut(dst_ptr, self.len())
        }
    }
}

/// Error type for [`Consumer::pop()`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PopError {
    /// The queue was empty.
    Empty,
}

#[cfg(feature = "std")]
impl std::error::Error for PopError {}

impl fmt::Display for PopError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            PopError::Empty => write!(f, "empty ring buffer"),
        }
    }
}

/// Error type for [`Consumer::peek()`].
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum PeekError {
    /// The queue was empty.
    Empty,
}

#[cfg(feature = "std")]
impl std::error::Error for PeekError {}

impl fmt::Display for PeekError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            PeekError::Empty => write!(f, "empty ring buffer"),
        }
    }
}

/// Error type for [`Producer::push()`].
#[derive(Copy, Clone, PartialEq, Eq)]
pub enum PushError<T> {
    /// The queue was full.
    Full(T),
}

#[cfg(feature = "std")]
impl<T> std::error::Error for PushError<T> {}

impl<T> fmt::Debug for PushError<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            PushError::Full(_) => f.pad("Full(_)"),
        }
    }
}

impl<T> fmt::Display for PushError<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            PushError::Full(_) => write!(f, "full ring buffer"),
        }
    }
}