Struct ringbuf::ring_buffer::SharedRb
source · pub struct SharedRb<T, C: Container<T>> { /* private fields */ }
Expand description
Ring buffer that could be shared between threads.
Implements Sync
if T
implements Send
. And therefore its Producer
and Consumer
implement Send
.
Note that there is no explicit requirement of T: Send
. Instead SharedRb
will work just fine even with T: !Send
until you try to send its Producer
or Consumer
to another thread.
use std::{thread, vec::Vec};
use ringbuf::SharedRb;
let (mut prod, mut cons) = SharedRb::<i32, Vec<_>>::new(256).split();
thread::spawn(move || {
prod.push(123).unwrap();
})
.join();
thread::spawn(move || {
assert_eq!(cons.pop().unwrap(), 123);
})
.join();
Implementations§
sourcepub fn new(capacity: usize) -> Self
pub fn new(capacity: usize) -> Self
Creates a new instance of a ring buffer.
Panics if allocation failed or capacity
is zero.
Examples found in repository?
More examples
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
let rb = HeapRb::<i32>::new(2);
let (mut prod, mut cons) = rb.split();
prod.push(0).unwrap();
prod.push(1).unwrap();
assert_eq!(prod.push(2), Err(2));
assert_eq!(cons.pop().unwrap(), 0);
prod.push(2).unwrap();
assert_eq!(cons.pop().unwrap(), 1);
assert_eq!(cons.pop().unwrap(), 2);
assert_eq!(cons.pop(), None);
}
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
fn main() {
let buf = HeapRb::<u8>::new(10);
let (mut prod, mut cons) = buf.split();
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
}
sourcepub fn try_new(capacity: usize) -> Result<Self, TryReserveError>
pub fn try_new(capacity: usize) -> Result<Self, TryReserveError>
Creates a new instance of a ring buffer returning an error if allocation failed.
Panics if capacity
is zero.
sourcepub unsafe fn from_raw_parts(container: C, head: usize, tail: usize) -> Self
pub unsafe fn from_raw_parts(container: C, head: usize, tail: usize) -> Self
sourcepub unsafe fn into_raw_parts(self) -> (C, usize, usize)
pub unsafe fn into_raw_parts(self) -> (C, usize, usize)
Destructures ring buffer into underlying container and head
and tail
counters.
Safety
Initialized contents of the container must be properly dropped.
sourcepub fn split(self) -> (Producer<T, Arc<Self>>, Consumer<T, Arc<Self>>)where
Self: Sized,
pub fn split(self) -> (Producer<T, Arc<Self>>, Consumer<T, Arc<Self>>)where Self: Sized,
Splits ring buffer into producer and consumer.
This method consumes the ring buffer and puts it on heap in Arc
. If you don’t want to use heap the see Self::split_ref
.
Examples found in repository?
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
let rb = HeapRb::<i32>::new(2);
let (mut prod, mut cons) = rb.split();
prod.push(0).unwrap();
prod.push(1).unwrap();
assert_eq!(prod.push(2), Err(2));
assert_eq!(cons.pop().unwrap(), 0);
prod.push(2).unwrap();
assert_eq!(cons.pop().unwrap(), 1);
assert_eq!(cons.pop().unwrap(), 2);
assert_eq!(cons.pop(), None);
}
More examples
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
fn main() {
let buf = HeapRb::<u8>::new(10);
let (mut prod, mut cons) = buf.split();
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
}
Trait Implementations§
source§fn as_slices(&self) -> (&[T], &[T])
fn as_slices(&self) -> (&[T], &[T])
source§fn as_mut_slices(&mut self) -> (&mut [T], &mut [T])
fn as_mut_slices(&mut self) -> (&mut [T], &mut [T])
source§fn pop(&mut self) -> Option<T>
fn pop(&mut self) -> Option<T>
source§fn pop_iter(&mut self) -> PopIterator<'_, T, RbWrap<Self>> ⓘ
fn pop_iter(&mut self) -> PopIterator<'_, T, RbWrap<Self>> ⓘ
source§fn iter(&self) -> Chain<Iter<'_, T>, Iter<'_, T>>
fn iter(&self) -> Chain<Iter<'_, T>, Iter<'_, T>>
source§fn iter_mut(&mut self) -> Chain<IterMut<'_, T>, IterMut<'_, T>>
fn iter_mut(&mut self) -> Chain<IterMut<'_, T>, IterMut<'_, T>>
source§fn skip(&mut self, count: usize) -> usize
fn skip(&mut self, count: usize) -> usize
n
items from the buffer and safely drops them. Read moresource§fn clear(&mut self) -> usize
fn clear(&mut self) -> usize
source§fn push_overwrite(&mut self, elem: T) -> Option<T>
fn push_overwrite(&mut self, elem: T) -> Option<T>
source§fn push_iter<I: Iterator<Item = T>>(&mut self, iter: &mut I)
fn push_iter<I: Iterator<Item = T>>(&mut self, iter: &mut I)
source§fn push_iter_overwrite<I: Iterator<Item = T>>(&mut self, iter: I)
fn push_iter_overwrite<I: Iterator<Item = T>>(&mut self, iter: I)
source§unsafe fn slices(
&self,
head: usize,
tail: usize
) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
unsafe fn slices( &self, head: usize, tail: usize ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
source§fn capacity_nonzero(&self) -> NonZeroUsize
fn capacity_nonzero(&self) -> NonZeroUsize
source§fn modulus(&self) -> NonZeroUsize
fn modulus(&self) -> NonZeroUsize
source§fn occupied_len(&self) -> usize
fn occupied_len(&self) -> usize
source§fn vacant_len(&self) -> usize
fn vacant_len(&self) -> usize
source§unsafe fn advance_head(&self, count: usize)
unsafe fn advance_head(&self, count: usize)
count
items forward. Read moresource§fn occupied_ranges(&self) -> (Range<usize>, Range<usize>)
fn occupied_ranges(&self) -> (Range<usize>, Range<usize>)
Self::occupied_slices
location in underlying container.source§unsafe fn occupied_slices(
&self
) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
unsafe fn occupied_slices( &self ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
source§unsafe fn advance_tail(&self, count: usize)
unsafe fn advance_tail(&self, count: usize)
count
items forward. Read moresource§fn vacant_ranges(&self) -> (Range<usize>, Range<usize>)
fn vacant_ranges(&self) -> (Range<usize>, Range<usize>)
Self::vacant_slices
location in underlying container.