Crate rtrb[][src]

Expand description

A realtime-safe single-producer single-consumer (SPSC) ring buffer.

A RingBuffer consists of two parts: a Producer for writing into the ring buffer and a Consumer for reading from the ring buffer.

A fixed-capacity buffer is allocated on construction. After that, no more memory is allocated (unless the type T does that internally). Reading from and writing into the ring buffer is lock-free and wait-free. All reading and writing functions return immediately. Attempts to write to a full buffer return an error; values inside the buffer are not overwritten. Attempts to read from an empty buffer return an error as well. Only a single thread can write into the ring buffer and a single thread (typically a different one) can read from the ring buffer. If the queue is empty, there is no way for the reading thread to wait for new data, other than trying repeatedly until reading succeeds. Similarly, if the queue is full, there is no way for the writing thread to wait for newly available space to write to, other than trying repeatedly.

Examples

Moving single elements into and out of a queue with Producer::push() and Consumer::pop(), respectively:

use rtrb::{RingBuffer, PushError, PopError};

let (mut producer, mut consumer) = RingBuffer::new(2).split();

assert_eq!(producer.push(10), Ok(()));
assert_eq!(producer.push(20), Ok(()));
assert_eq!(producer.push(30), Err(PushError::Full(30)));

std::thread::spawn(move || {
    assert_eq!(consumer.pop(), Ok(10));
    assert_eq!(consumer.pop(), Ok(20));
    assert_eq!(consumer.pop(), Err(PopError::Empty));
}).join().unwrap();

Producing and consuming multiple items at once with Producer::write_chunk() and Consumer::read_chunk(), respectively. This example uses a single thread for simplicity, but in a real application, producer and consumer would of course live on different threads:

use rtrb::RingBuffer;

let (mut producer, mut consumer) = RingBuffer::new(5).split();

if let Ok(mut chunk) = producer.write_chunk(4) {
    // NB: Don't use `chunk` as the first iterator in zip() if the other one might be shorter!
    for (src, dst) in vec![10, 11, 12].into_iter().zip(&mut chunk) {
        *dst = src;
    }
    // Don't forget to make the written slots available for reading!
    chunk.commit_iterated();
    // Note that we requested 4 slots but we've only written 3!
} else {
    unreachable!();
}

assert_eq!(producer.slots(), 2);
assert_eq!(consumer.slots(), 3);

if let Ok(mut chunk) = consumer.read_chunk(2) {
    // NB: Even though we are just reading, `chunk` needs to be mutable for iteration!
    assert_eq!((&mut chunk).collect::<Vec<_>>(), [&10, &11]);
    chunk.commit_iterated(); // Mark the slots as "consumed"
    // chunk.commit_all() would also work here.
} else {
    unreachable!();
}

// One element is still in the queue:
assert_eq!(consumer.peek(), Ok(&12));

let data = vec![20, 21, 22, 23];
if let Ok(mut chunk) = producer.write_chunk(4) {
    let (first, second) = chunk.as_mut_slices();
    let mid = first.len();
    first.copy_from_slice(&data[..mid]);
    second.copy_from_slice(&data[mid..]);
    chunk.commit_all();
} else {
    unreachable!();
}

assert!(producer.is_full());
assert_eq!(consumer.slots(), 5);

let mut v = Vec::<i32>::with_capacity(5);
if let Ok(chunk) = consumer.read_chunk(5) {
    let (first, second) = chunk.as_slices();
    v.extend(first);
    v.extend(second);
    chunk.commit_all();
} else {
    unreachable!();
}
assert_eq!(v, [12, 20, 21, 22, 23]);
assert!(consumer.is_empty());

Common Access Patterns

The following examples show the Producer side; similar patterns can of course be used with Consumer::read_chunk() as well. Furthermore, the examples use Producer::write_chunk(), which requires the trait bounds T: Copy + Default. If that’s too restrictive or if you want to squeeze out the last bit of performance, you can use Producer::write_chunk_uninit() instead, but this will force you to write some unsafe code.

Copy a whole slice of items into the ring buffer, but only if space permits (if not, the input slice is returned as an error):

use rtrb::Producer;

fn push_entire_slice<'a, T>(queue: &mut Producer<T>, slice: &'a [T]) -> Result<(), &'a [T]>
where
    T: Copy + Default,
{
    if let Ok(mut chunk) = queue.write_chunk(slice.len()) {
        let (first, second) = chunk.as_mut_slices();
        let mid = first.len();
        first.copy_from_slice(&slice[..mid]);
        second.copy_from_slice(&slice[mid..]);
        chunk.commit_all();
        Ok(())
    } else {
        Err(slice)
    }
}

Copy as many items as possible from a given slice, returning the remainder of the slice (which will be empty if there was space for all items):

use rtrb::{Producer, ChunkError::TooFewSlots};

fn push_partial_slice<'a, T>(queue: &mut Producer<T>, slice: &'a [T]) -> &'a [T]
where
    T: Copy + Default,
{
    let mut chunk = match queue.write_chunk(slice.len()) {
        Ok(chunk) => chunk,
        // This is an optional optimization if the queue tends to be full:
        Err(TooFewSlots(0)) => return slice,
        // Remaining slots are returned, this will always succeed:
        Err(TooFewSlots(n)) => queue.write_chunk(n).unwrap(),
    };
    let end = chunk.len();
    let (first, second) = chunk.as_mut_slices();
    let mid = first.len();
    first.copy_from_slice(&slice[..mid]);
    second.copy_from_slice(&slice[mid..end]);
    chunk.commit_all();
    &slice[end..]
}

Write as many slots as possible, given an iterator (and return the number of written slots):

use rtrb::{Producer, ChunkError::TooFewSlots};

fn push_from_iter<T, I>(queue: &mut Producer<T>, iter: &mut I) -> usize
where
    T: Copy + Default,
    I: Iterator<Item = T>,
{
    let n = match iter.size_hint() {
        (_, None) => queue.slots(),
        (_, Some(n)) => n,
    };
    let mut chunk = match queue.write_chunk(n) {
        Ok(chunk) => chunk,
        // As above, this is an optional optimization:
        Err(TooFewSlots(0)) => return 0,
        // As above, this will always succeed:
        Err(TooFewSlots(n)) => queue.write_chunk(n).unwrap(),
    };
    for (source, target) in iter.zip(&mut chunk) {
        *target = source;
    }
    chunk.commit_iterated()
}

Structs

Consumer

The consumer side of a RingBuffer.

Producer

The producer side of a RingBuffer.

ReadChunk

Structure for reading from multiple slots in one go.

RingBuffer

A bounded single-producer single-consumer queue.

WriteChunk

Structure for writing into multiple (Default-initialized) slots in one go.

WriteChunkUninit

Structure for writing into multiple (uninitialized) slots in one go.

Enums

ChunkError

Error type for Consumer::read_chunk(), Producer::write_chunk() and Producer::write_chunk_uninit().

PeekError

Error type for Consumer::peek().

PopError

Error type for Consumer::pop().

PushError

Error type for Producer::push().

Traits

CopyToUninit

Extension trait used to provide a copy_to_uninit() method on built-in slices.