fring 0.4.0

Lightweight, no_std, and *fast* ring buffer
Documentation
//! Fast ring buffer intended for no_std targets.
//!
//! `fring` ("fast ring") is a fast and lightweight circular buffer, designed
//! for embedded systems and other no_std targets.  ("Circular buffer" means
//! it is a FIFO queue, stored as an array, and the data wraps back to the
//! beginning of the array once it reaches the end.)  The memory footprint of
//! a `fring::Buffer` is the buffer itself plus two `usize` indices.
//!
//! The buffer allows a single producer and a single consumer, which may
//! operate concurrently.  Memory safety and thread safety are enforced at
//! compile time; the buffer is lock-free at runtime.  The buffer length is
//! required to be a power of two, and the only arithmetic operations used by
//! buffer operations are addition/subtraction and bitwise and.
//!
//! The only way to use a [`Buffer`] is to split it into a [`Producer`] and a
//! [`Consumer`].  Then one may call `Producer.write()` and `Consumer.read()`,
//! or various other methods which are provided by `Producer` and `Consumer`.
//!
//! Example of safe threaded use:
//! ```rust
//! # const N: usize = 8;
//! # fn make_data(_: fring::Producer<u8, N>) {}
//! # fn use_data(_: fring::Consumer<u8, N>) {}
//! fn main() {
//!     let mut buffer = fring::Buffer::<u8, N>::new();
//!     let (producer, consumer) = buffer.split();
//!     std::thread::scope(|s| {
//!         s.spawn(|| {
//!             make_data(producer);
//!         });
//!         use_data(consumer);
//!     });
//! }
//! ```
//!
//! Example of static use (requires `unsafe`):
//! ```rust
//! # const N: usize = 8;
//! # fn write_data(_: fring::Producer<u8, N>) {}
//! # fn use_data(_: fring::Consumer<u8, N>) {}
//! static BUFFER: fring::Buffer<u8, N> = fring::Buffer::new();
//!
//! fn interrupt_handler() {
//!     // UNSAFE: this is safe because this is the only place we ever
//!     // call BUFFER.producer(), and interrupt_handler() is not reentrant
//!     let producer = unsafe { BUFFER.producer() };
//!     write_data(producer);
//! }
//!
//! fn main() {
//!     // UNSAFE: this is safe because this is the only place we ever
//!     // call BUFFER.consumer(), and main() is not reentrant
//!     let consumer = unsafe { BUFFER.consumer() };
//!     use_data(consumer);
//! }
//! ```

#![no_std]

use core::sync::atomic::{AtomicUsize, Ordering::Relaxed};

/// A `Buffer<N>` consists of a `[T; N]` array along with two `usize`
/// indices into the array.  `N` must be a power of two.  (If you need more
/// flexibility with sizing, consider using a `bbqueue::BBBuffer` instead.)
/// A `Buffer<N>` can hold `N` elements of type `T` and guarantees FIFO ordering.
/// The only way to use a `Buffer` is to split it into a [`Producer`] and a
/// [`Consumer`], which may then be passed to different threads or contexts.
pub struct Buffer<T: Sized, const N: usize> {
    data: core::cell::UnsafeCell<[T; N]>,
    head: AtomicUsize, // head = next index to be read
    tail: AtomicUsize, // tail = next index to be written
}
// `head` and `tail` are allowed to increment all the way to `usize::MAX`
// and wrap around.  We maintain the invariants `0 <= tail - head <= N` and
// `0 <= N + head - tail <= N` (note that these may be *wrapping* subtractions).
// Indices into `data` are given by `head % N` and `tail % N`.  Since `N`
// is a power of 2, these are equal to `head & (N - 1)` and `tail & (N - 1)`.
// When the buffer is empty, `head == tail`.  When the buffer is full,
// `head + N == tail` (and note this may be a *wrapping* addition).

/// A `Producer` is a smart pointer to a `Buffer`, which is endowed with
/// the right to add data into the buffer.  Only one `Producer` may exist
/// at one time for any given buffer.  The methods of a `Producer` are the
/// only way to insert data into a `Buffer`.
pub struct Producer<'a, T: Sized, const N: usize> {
    buffer: &'a Buffer<T, N>,
    // The Producer is allowed to increment buffer.tail (up to a maximum
    // value of buffer.head + N), but may not modify buffer.head.
}

/// A `Consumer` is a smart pointer to a `Buffer`, which is endowed with
/// the right to remove data from the buffer.  Only one `Consumer` may exist
/// at one time for any given buffer.  The methods of a `Consumer` are the
/// only way to read data out of a `Buffer`.
pub struct Consumer<'a, T: Sized, const N: usize> {
    buffer: &'a Buffer<T, N>,
    // The Consumer is allowed to increment buffer.head (up to a maximum
    // value of buffer.tail), but may not modify buffer.tail.
}

/// A `Region` is a smart pointer to a specific region of data in a [`Buffer`].
/// The `Region` derefs to `[T]` and may generally be used in the same way as
/// a slice (e.g. `region[i]`, `region.len()`).  When a `Region` is dropped,
/// it updates the associated `Buffer` to indicate that this section of the
/// buffer is finished being read or written.  If a `Region` is forgotten
/// instead of dropped, the buffer will not be updated and the same region will
/// be re-issued by the next read/write.
///
/// A Region holds a mutable (i.e. exclusive) reference to its owner (of type
/// `O`), which is either a `Producer` (for writing to a buffer) or a `Consumer`
/// (for reading from a buffer). Therefore, for a given buffer, at most
/// one region for reading (referring to the consumer) and one region for writing
/// (referring to the producer) can exist at any time. This is the mechanism by
/// which thread safety for the ring buffer is enforced at compile time.
pub struct Region<'b, O, T: Sized> {
    region: &'b mut [T],                 // points to a subslice of Buffer.data
    index_to_increment: &'b AtomicUsize, // points to Buffer.head or Buffer.tail
    _owner: &'b mut O,                   // points to a Producer or Consumer
}

impl<T: Sized, const N: usize> Buffer<T, N> {
    /// Return a new, empty buffer. The memory backing the buffer is zero-initialized.
    pub const fn new() -> Self {
        // Force a compile-time failure if N is not a power of 2.
        const {
            assert!(
                (N != 0) && ((N - 1) & N == 0),
                "buffer size must be a power of 2"
            )
        };
        Buffer {
            data: core::cell::UnsafeCell::new(unsafe { core::mem::zeroed() }),
            head: AtomicUsize::new(0),
            tail: AtomicUsize::new(0),
        }
    }
    /// Split the `Buffer` into a `Producer` and a `Consumer`.  This function is the
    /// only safe way to create a `Producer` or a `Consumer`.  This function requires
    /// a mutable (i.e. exclusive) reference to the buffer, and the lifetime of that
    /// reference is equal to the lifetimes of the producer and consumer which are
    /// returned.  Therefore, for a given buffer, only one producer and one consumer
    /// can exist at one time.
    pub fn split(&mut self) -> (Producer<'_, T, N>, Consumer<'_, T, N>) {
        (Producer { buffer: self }, Consumer { buffer: self })
    }
    /// Return a `Producer` associated with this buffer.
    /// # Safety
    /// Ensure that at most one `Producer` for this buffer exists at any time.
    pub unsafe fn producer(&self) -> Producer<'_, T, N> {
        Producer { buffer: self }
    }
    /// Return a `Consumer` associated with this buffer.
    /// # Safety
    /// Ensure that at most one `Consumer` for this buffer exists at any time.
    pub unsafe fn consumer(&self) -> Consumer<'_, T, N> {
        Consumer { buffer: self }
    }
    #[inline(always)]
    fn calc_pointers(&self, indices: [usize; 2], target_len: usize) -> (*mut T, usize, usize) {
        // length calculations which are shared between `slice()` and `split_slice()`
        let [start, end] = indices;
        (
            // points to the element of Buffer.data at position `start`
            unsafe { (self.data.get() as *mut T).add(start & (N - 1)) },
            // maximum length from `start` which doesn't wrap around
            N - (start & (N - 1)),
            // maximum length <= `target_len` which fits between `start` and `end`
            core::cmp::min(target_len, end.wrapping_sub(start)),
        )
    }
    /// Internal use only. Return a T slice extending from `indices.0` to `indices.1`,
    /// except that the slice shall not be longer than `target_len`, and the slice shall
    /// not wrap around the end of the buffer.  Start and end indices are wrapped to the
    /// buffer length.  UNSAFE: caller is responsible for ensuring that overlapping
    /// slices are never created, since we return a mutable (i.e. exclusive) slice.
    #[inline(always)]
    #[allow(clippy::mut_from_ref)]
    unsafe fn slice(&self, indices: [usize; 2], target_len: usize) -> &mut [T] {
        let (start_ptr, wrap_len, len) = self.calc_pointers(indices, target_len);
        unsafe { core::slice::from_raw_parts_mut(start_ptr, core::cmp::min(len, wrap_len)) }
    }
}

impl<T: Sized, const N: usize> Default for Buffer<T, N> {
    /// The default value is an empty buffer.
    fn default() -> Self {
        Self::new()
    }
}

unsafe impl<T: Sized, const N: usize> Send for Buffer<T, N> {}
/// `Buffer<N>` is `Send` and `Sync` because accesses to its internal data are
/// only possible via a single `Producer` and a single `Consumer` at any time.
unsafe impl<T: Sized, const N: usize> Sync for Buffer<T, N> {}

impl<'a, T: Sized, const N: usize> Producer<'a, T, N> {
    fn indices(&self) -> [usize; 2] {
        [
            self.buffer.tail.load(Relaxed),
            self.buffer.head.load(Relaxed).wrapping_add(N),
        ]
    }
    /// Return a `Region` for up to `target_len` elements to be written into
    /// the buffer. The returned region may be shorter than `target_len`.
    /// The returned region has length zero if and only if the buffer is full.
    /// The returned region is guaranteed to be not longer than `target_len`.
    /// To write the largest possible length, set `target_len = usize::MAX`.
    pub fn write<'b>(&'b mut self, target_len: usize) -> Region<'b, Self, T> {
        Region {
            region: unsafe { self.buffer.slice(self.indices(), target_len) },
            index_to_increment: &self.buffer.tail,
            _owner: self,
        }
    }
    /// Return the amount of empty space currently available in the buffer.
    /// If the consumer is reading concurrently with this call, then the amount
    /// of empty space may increase, but it will not decrease below the value
    /// which is returned.
    pub fn empty_size(&self) -> usize {
        let [start, end] = self.indices();
        end.wrapping_sub(start)
    }
}

impl<'a, T: Sized, const N: usize> Consumer<'a, T, N> {
    fn indices(&self) -> [usize; 2] {
        [
            self.buffer.head.load(Relaxed),
            self.buffer.tail.load(Relaxed),
        ]
    }
    /// Return a `Region` for up to `target_len` elements to be read from
    /// the buffer. The returned region may be shorter than `target_len`.
    /// The returned region has length zero if and only if the buffer is empty.
    /// The returned region is guaranteed to be not longer than `target_len`.
    /// To read the largest possible length, set `target_len = usize::MAX`.
    ///
    /// Even though we are reading from the buffer, the `Region` which is returned
    /// is mutable.  Its memory is available for arbitrary use by the caller
    /// for as long as the `Region` remains in scope.
    pub fn read<'b>(&'b mut self, target_len: usize) -> Region<'b, Self, T> {
        Region {
            region: unsafe { self.buffer.slice(self.indices(), target_len) },
            index_to_increment: &self.buffer.head,
            _owner: self,
        }
    }
    /// Return the amount of data currently stored in the buffer.
    /// If the producer is writing concurrently with this call,
    /// then the amount of data may increase, but it will not
    /// decrease below the value which is returned.
    pub fn data_size(&self) -> usize {
        let [start, end] = self.indices();
        end.wrapping_sub(start)
    }
    /// Discard all data which is currently stored in the buffer.
    /// If the producer is writing concurrently with this call,
    /// then the producer's newest data may not be discarded.
    pub fn flush(&mut self) {
        self.buffer
            .head
            .store(self.buffer.tail.load(Relaxed), Relaxed);
    }
}

impl<'b, O, T: Sized> Region<'b, O, T> {
    /// Update the buffer to indicate that the first `num` elements of this region are
    /// finished being read or written.  The start and length of this region will be
    /// updated such that the remaining `region.len() - num` elements remain in this
    /// region for future reading or writing.
    pub fn consume(&mut self, num: usize) {
        assert!(num <= self.region.len());
        self.index_to_increment.fetch_add(num, Relaxed);
        // UNSAFE: this is safe because we are replacing self.region with a subslice
        // of self.region, and it is constrained to keep the same lifetime.
        self.region = unsafe {
            core::slice::from_raw_parts_mut(
                self.region.as_mut_ptr().add(num),
                self.region.len() - num,
            )
        }
    }
    /// Update the buffer to indicate that the first `num` elements of this region are
    /// finished being read or written, and the remaining `region.len() - num` elements
    /// will not be used.  `region.partial_drop(0)` is equivalent to
    /// `core::mem::forget(region)`.
    pub fn partial_drop(self, num: usize) {
        assert!(num <= self.region.len());
        self.index_to_increment.fetch_add(num, Relaxed);
        core::mem::forget(self); // don't run drop() now!
    }
}

impl<'b, O, T: Sized> Drop for Region<'b, O, T> {
    /// Update the buffer to indicate that the memory being read or written is now
    /// ready for use. Dropping a `Region` requires a single addition operation to
    /// one field of the `Buffer`.
    fn drop(&mut self) {
        self.index_to_increment
            .fetch_add(self.region.len(), Relaxed);
    }
}

impl<'b, O, T: Sized> core::ops::Deref for Region<'b, O, T> {
    type Target = [T];
    fn deref(&self) -> &[T] {
        self.region
    }
}

impl<'b, O, T: Sized> core::ops::DerefMut for Region<'b, O, T> {
    fn deref_mut(&mut self) -> &mut [T] {
        self.region
    }
}

#[test]
fn index_wraparound() {
    // This can't be tested using the public interface because it would
    // take too long to get `head` and `tail` incremented to usize::MAX.
    let mut b = Buffer::<u8, 64>::new();
    b.head.fetch_sub(128, Relaxed);
    b.tail.fetch_sub(128, Relaxed);
    // Now b.head == b.tail == usize::MAX - 127
    let (mut p, mut c) = b.split();
    for _ in 0..4 {
        assert!(p.empty_size() == 64);
        assert!(p.write(32).len() == 32);
        assert!(p.empty_size() == 32);
        assert!(p.write(usize::MAX).len() == 32);
        assert!(p.empty_size() == 0);
        assert!(p.write(usize::MAX).len() == 0);
        assert!(c.data_size() == 64);
        assert!(c.read(32).len() == 32);
        assert!(c.data_size() == 32);
        assert!(c.read(usize::MAX).len() == 32);
        assert!(c.data_size() == 0);
        assert!(c.read(usize::MAX).len() == 0);
    }
    assert!(b.head.load(Relaxed) == 128);
    assert!(b.tail.load(Relaxed) == 128);
}