Crate rtrb[−][src]
Expand description
A realtime-safe single-producer single-consumer (SPSC) ring buffer.
A RingBuffer
consists of two parts:
a Producer
for writing into the ring buffer and
a Consumer
for reading from the ring buffer.
A fixed-capacity buffer is allocated on construction.
After that, no more memory is allocated (unless the type T
does that internally).
Reading from and writing into the ring buffer is lock-free and wait-free.
All reading and writing functions return immediately.
Attempts to write to a full buffer return an error;
values inside the buffer are not overwritten.
Attempts to read from an empty buffer return an error as well.
Only a single thread can write into the ring buffer and a single thread
(typically a different one) can read from the ring buffer.
If the queue is empty, there is no way for the reading thread to wait
for new data, other than trying repeatedly until reading succeeds.
Similarly, if the queue is full, there is no way for the writing thread
to wait for newly available space to write to, other than trying repeatedly.
Examples
Moving single elements into and out of a queue with
Producer::push()
and Consumer::pop()
, respectively:
use rtrb::{RingBuffer, PushError, PopError}; let (mut producer, mut consumer) = RingBuffer::new(2).split(); assert_eq!(producer.push(10), Ok(())); assert_eq!(producer.push(20), Ok(())); assert_eq!(producer.push(30), Err(PushError::Full(30))); std::thread::spawn(move || { assert_eq!(consumer.pop(), Ok(10)); assert_eq!(consumer.pop(), Ok(20)); assert_eq!(consumer.pop(), Err(PopError::Empty)); }).join().unwrap();
Producing and consuming multiple items at once with
Producer::write_chunk()
and Consumer::read_chunk()
, respectively.
This example uses a single thread for simplicity, but in a real application,
producer
and consumer
would of course live on different threads:
use rtrb::RingBuffer; let (mut producer, mut consumer) = RingBuffer::new(5).split(); if let Ok(mut chunk) = producer.write_chunk(4) { // NB: Don't use `chunk` as the first iterator in zip() if the other one might be shorter! for (src, dst) in vec![10, 11, 12].into_iter().zip(&mut chunk) { *dst = src; } // Don't forget to make the written slots available for reading! chunk.commit_iterated(); // Note that we requested 4 slots but we've only written 3! } else { unreachable!(); } assert_eq!(producer.slots(), 2); assert_eq!(consumer.slots(), 3); if let Ok(mut chunk) = consumer.read_chunk(2) { // NB: Even though we are just reading, `chunk` needs to be mutable for iteration! assert_eq!((&mut chunk).collect::<Vec<_>>(), [&10, &11]); chunk.commit_iterated(); // Mark the slots as "consumed" // chunk.commit_all() would also work here. } else { unreachable!(); } // One element is still in the queue: assert_eq!(consumer.peek(), Ok(&12)); let data = vec![20, 21, 22, 23]; if let Ok(mut chunk) = producer.write_chunk(4) { let (first, second) = chunk.as_mut_slices(); let mid = first.len(); first.copy_from_slice(&data[..mid]); second.copy_from_slice(&data[mid..]); chunk.commit_all(); } else { unreachable!(); } assert!(producer.is_full()); assert_eq!(consumer.slots(), 5); let mut v = Vec::<i32>::with_capacity(5); if let Ok(chunk) = consumer.read_chunk(5) { let (first, second) = chunk.as_slices(); v.extend(first); v.extend(second); chunk.commit_all(); } else { unreachable!(); } assert_eq!(v, [12, 20, 21, 22, 23]); assert!(consumer.is_empty());
Common Access Patterns
The following examples show the Producer
side;
similar patterns can of course be used with Consumer::read_chunk()
as well.
Furthermore, the examples use Producer::write_chunk()
,
which requires the trait bounds T: Copy + Default
.
If that’s too restrictive or if you want to squeeze out the last bit of performance,
you can use Producer::write_chunk_uninit()
instead,
but this will force you to write some unsafe
code.
Copy a whole slice of items into the ring buffer, but only if space permits (if not, the input slice is returned as an error):
use rtrb::Producer; fn push_entire_slice<'a, T>(queue: &mut Producer<T>, slice: &'a [T]) -> Result<(), &'a [T]> where T: Copy + Default, { if let Ok(mut chunk) = queue.write_chunk(slice.len()) { let (first, second) = chunk.as_mut_slices(); let mid = first.len(); first.copy_from_slice(&slice[..mid]); second.copy_from_slice(&slice[mid..]); chunk.commit_all(); Ok(()) } else { Err(slice) } }
Copy as many items as possible from a given slice, returning the remainder of the slice (which will be empty if there was space for all items):
use rtrb::{Producer, ChunkError::TooFewSlots}; fn push_partial_slice<'a, T>(queue: &mut Producer<T>, slice: &'a [T]) -> &'a [T] where T: Copy + Default, { let mut chunk = match queue.write_chunk(slice.len()) { Ok(chunk) => chunk, // This is an optional optimization if the queue tends to be full: Err(TooFewSlots(0)) => return slice, // Remaining slots are returned, this will always succeed: Err(TooFewSlots(n)) => queue.write_chunk(n).unwrap(), }; let end = chunk.len(); let (first, second) = chunk.as_mut_slices(); let mid = first.len(); first.copy_from_slice(&slice[..mid]); second.copy_from_slice(&slice[mid..end]); chunk.commit_all(); &slice[end..] }
Write as many slots as possible, given an iterator (and return the number of written slots):
use rtrb::{Producer, ChunkError::TooFewSlots}; fn push_from_iter<T, I>(queue: &mut Producer<T>, iter: &mut I) -> usize where T: Copy + Default, I: Iterator<Item = T>, { let n = match iter.size_hint() { (_, None) => queue.slots(), (_, Some(n)) => n, }; let mut chunk = match queue.write_chunk(n) { Ok(chunk) => chunk, // As above, this is an optional optimization: Err(TooFewSlots(0)) => return 0, // As above, this will always succeed: Err(TooFewSlots(n)) => queue.write_chunk(n).unwrap(), }; for (source, target) in iter.zip(&mut chunk) { *target = source; } chunk.commit_iterated() }
Structs
Consumer | The consumer side of a |
Producer | The producer side of a |
ReadChunk | Structure for reading from multiple slots in one go. |
RingBuffer | A bounded single-producer single-consumer queue. |
WriteChunk | Structure for writing into multiple ( |
WriteChunkUninit | Structure for writing into multiple (uninitialized) slots in one go. |
Enums
ChunkError | Error type for |
PeekError | Error type for |
PopError | Error type for |
PushError | Error type for |
Traits
CopyToUninit | Extension trait used to provide a |