pub struct Consumer<T, R: RbRef>where
R::Rb: RbRead<T>,{ /* private fields */ }
Expand description
Consumer part of ring buffer.
Mode
It can operate in immediate (by default) or postponed mode.
Mode could be switched using Self::postponed
/Self::into_postponed
and Self::into_immediate
methods.
- In immediate mode removed and inserted items are automatically synchronized with the other end.
- In postponed mode synchronization occurs only when
Self::sync
orSelf::into_immediate
is called or whenSelf
is dropped. The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.
Implementations§
source§impl<T, R: RbRef> Consumer<T, R>where
R::Rb: RbRead<T>,
impl<T, R: RbRef> Consumer<T, R>where R::Rb: RbRead<T>,
sourcepub unsafe fn new(target: R) -> Self
pub unsafe fn new(target: R) -> Self
Creates consumer from the ring buffer reference.
Safety
There must be only one consumer containing the same ring buffer reference.
sourcepub fn into_rb_ref(self) -> R
pub fn into_rb_ref(self) -> R
Consumes self
and returns underlying ring buffer reference.
sourcepub fn postponed(&mut self) -> Consumer<T, RbWrap<RbReadCache<T, &R::Rb>>>
pub fn postponed(&mut self) -> Consumer<T, RbWrap<RbReadCache<T, &R::Rb>>>
Returns postponed consumer that borrows Self
.
sourcepub fn into_postponed(self) -> Consumer<T, RbWrap<RbReadCache<T, R>>>
pub fn into_postponed(self) -> Consumer<T, RbWrap<RbReadCache<T, R>>>
Transforms Self
into postponed consumer.
sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns capacity of the ring buffer.
The capacity of the buffer is constant.
sourcepub fn is_empty(&self) -> bool
pub fn is_empty(&self) -> bool
Checks if the ring buffer is empty.
The result may become irrelevant at any time because of concurring producer activity.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
fn main() {
let buf = HeapRb::<u8>::new(10);
let (mut prod, mut cons) = buf.split();
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
}
sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
The number of items stored in the buffer.
Actual number may be greater than the returned value because of concurring producer activity.
sourcepub fn free_len(&self) -> usize
pub fn free_len(&self) -> usize
The number of remaining free places in the buffer.
Actual number may be less than the returned value because of concurring producer activity.
sourcepub unsafe fn as_uninit_slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>])
pub unsafe fn as_uninit_slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>])
Provides a direct access to the ring buffer occupied memory.
The difference from Self::as_slices
is that this method provides slices of MaybeUninit<T>
, so items may be moved out of slices.
Returns a pair of slices of stored items, the second one may be empty. Elements with lower indices in slice are older. First slice contains older items that second one.
Safety
All items are initialized. Elements must be removed starting from the beginning of first slice. When all items are removed from the first slice then items must be removed from the beginning of the second slice.
This method must be followed by Self::advance
call with the number of items being removed previously as argument.
No other mutating calls allowed before that.
sourcepub unsafe fn as_mut_uninit_slices(
&self
) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
pub unsafe fn as_mut_uninit_slices( &self ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
Provides a direct mutable access to the ring buffer occupied memory.
Same as Self::as_uninit_slices
.
Safety
sourcepub unsafe fn advance(&mut self, count: usize)
pub unsafe fn advance(&mut self, count: usize)
Moves head
target by count
places.
Safety
First count
items in occupied memory must be moved out or dropped.
sourcepub fn as_slices(&self) -> (&[T], &[T])
pub fn as_slices(&self) -> (&[T], &[T])
Returns a pair of slices which contain, in order, the contents of the ring buffer.
sourcepub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T])
pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T])
Returns a pair of mutable slices which contain, in order, the contents of the ring buffer.
sourcepub fn pop(&mut self) -> Option<T>
pub fn pop(&mut self) -> Option<T>
Removes latest item from the ring buffer and returns it.
Returns None
if the ring buffer is empty.
Examples found in repository?
More examples
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
let rb = HeapRb::<i32>::new(2);
let (mut prod, mut cons) = rb.split();
prod.push(0).unwrap();
prod.push(1).unwrap();
assert_eq!(prod.push(2), Err(2));
assert_eq!(cons.pop().unwrap(), 0);
prod.push(2).unwrap();
assert_eq!(cons.pop().unwrap(), 1);
assert_eq!(cons.pop().unwrap(), 2);
assert_eq!(cons.pop(), None);
}
sourcepub fn pop_iter(&mut self) -> PopIterator<'_, T, R> ⓘ
pub fn pop_iter(&mut self) -> PopIterator<'_, T, R> ⓘ
Returns an iterator that removes items one by one from the ring buffer.
Iterator provides only items that are available for consumer at the moment of pop_iter
call, it will not contain new items added after it was created.
Information about removed items is commited to the buffer only when iterator is destroyed.
sourcepub fn iter(&self) -> impl Iterator<Item = &T> + '_
pub fn iter(&self) -> impl Iterator<Item = &T> + '_
Returns a front-to-back iterator containing references to items in the ring buffer.
This iterator does not remove items out of the ring buffer.
sourcepub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_
pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_
Returns a front-to-back iterator that returns mutable references to items in the ring buffer.
This iterator does not remove items out of the ring buffer.
sourcepub fn skip(&mut self, count: usize) -> usize
pub fn skip(&mut self, count: usize) -> usize
Removes at most n
and at least min(n, Self::len())
items from the buffer and safely drops them.
If there is no concurring producer activity then exactly min(n, Self::len())
items are removed.
Returns the number of deleted items.
let target = HeapRb::<i32>::new(8);
let (mut prod, mut cons) = target.split();
assert_eq!(prod.push_iter(&mut (0..8)), 8);
assert_eq!(cons.skip(4), 4);
assert_eq!(cons.skip(8), 4);
assert_eq!(cons.skip(8), 0);
source§impl<T, R: RbRef> Consumer<T, RbWrap<RbReadCache<T, R>>>where
R::Rb: RbRead<T>,
impl<T, R: RbRef> Consumer<T, RbWrap<RbReadCache<T, R>>>where R::Rb: RbRead<T>,
sourcepub unsafe fn new_postponed(target: R) -> Self
pub unsafe fn new_postponed(target: R) -> Self
Create new postponed consumer.
Safety
There must be only one consumer containing the same ring buffer reference.
sourcepub fn sync(&mut self)
pub fn sync(&mut self)
Synchronize changes with the ring buffer.
Postponed consumer requires manual synchronization to make freed space visible for the producer.
sourcepub fn into_immediate(self) -> Consumer<T, R>
pub fn into_immediate(self) -> Consumer<T, R>
Synchronize and transform back to immediate consumer.
source§impl<R: RbRef> Consumer<u8, R>where
R::Rb: RbRead<u8>,
impl<R: RbRef> Consumer<u8, R>where R::Rb: RbRead<u8>,
sourcepub fn write_into<P: Write>(
&mut self,
writer: &mut P,
count: Option<usize>
) -> Result<usize>
pub fn write_into<P: Write>( &mut self, writer: &mut P, count: Option<usize> ) -> Result<usize>
Removes at most first count
bytes from the ring buffer and writes them into a Write
instance.
If count
is None
then as much as possible bytes will be written.
Returns Ok(n)
if write
succeeded. n
is number of bytes been written.
n == 0
means that either write
returned zero or ring buffer is empty.
If write
is failed then original error is returned. In this case it is guaranteed that no items was written to the writer.
To achieve this we write only one contiguous slice at once. So this call may write less than len
items even if the writer is ready to get more.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
fn main() {
let buf = HeapRb::<u8>::new(10);
let (mut prod, mut cons) = buf.split();
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
}
Trait Implementations§
source§impl<R: RbRef> Read for Consumer<u8, R>where
R::Rb: RbRead<u8>,
impl<R: RbRef> Read for Consumer<u8, R>where R::Rb: RbRead<u8>,
source§fn read(&mut self, buffer: &mut [u8]) -> Result<usize>
fn read(&mut self, buffer: &mut [u8]) -> Result<usize>
1.36.0 · source§fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>
fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>
read
, except that it reads into a slice of buffers. Read moresource§fn is_read_vectored(&self) -> bool
fn is_read_vectored(&self) -> bool
can_vector
)1.0.0 · source§fn read_to_end(&mut self, buf: &mut Vec<u8, Global>) -> Result<usize, Error>
fn read_to_end(&mut self, buf: &mut Vec<u8, Global>) -> Result<usize, Error>
buf
. Read more1.0.0 · source§fn read_to_string(&mut self, buf: &mut String) -> Result<usize, Error>
fn read_to_string(&mut self, buf: &mut String) -> Result<usize, Error>
buf
. Read more1.6.0 · source§fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error>
fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error>
buf
. Read moresource§fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>
fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>
read_buf
)source§fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>
fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>
read_buf
)cursor
. Read more1.0.0 · source§fn by_ref(&mut self) -> &mut Selfwhere
Self: Sized,
fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,
Read
. Read more