Struct ringbuf::consumer::Consumer

source ·
pub struct Consumer<T, R: RbRef>where
    R::Rb: RbRead<T>,{ /* private fields */ }
Expand description

Consumer part of ring buffer.

Mode

It can operate in immediate (by default) or postponed mode. Mode could be switched using Self::postponed/Self::into_postponed and Self::into_immediate methods.

  • In immediate mode removed and inserted items are automatically synchronized with the other end.
  • In postponed mode synchronization occurs only when Self::sync or Self::into_immediate is called or when Self is dropped. The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.

Implementations§

source§

impl<T, R: RbRef> Consumer<T, R>where R::Rb: RbRead<T>,

source

pub unsafe fn new(target: R) -> Self

Creates consumer from the ring buffer reference.

Safety

There must be only one consumer containing the same ring buffer reference.

source

pub fn rb(&self) -> &R::Rb

Returns reference to the underlying ring buffer.

source

pub fn into_rb_ref(self) -> R

Consumes self and returns underlying ring buffer reference.

source

pub fn postponed(&mut self) -> Consumer<T, RbWrap<RbReadCache<T, &R::Rb>>>

Returns postponed consumer that borrows Self.

source

pub fn into_postponed(self) -> Consumer<T, RbWrap<RbReadCache<T, R>>>

Transforms Self into postponed consumer.

source

pub fn capacity(&self) -> usize

Returns capacity of the ring buffer.

The capacity of the buffer is constant.

source

pub fn is_empty(&self) -> bool

Checks if the ring buffer is empty.

The result may become irrelevant at any time because of concurring producer activity.

Examples found in repository?
examples/message.rs (line 37)
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    let buf = HeapRb::<u8>::new(10);
    let (mut prod, mut cons) = buf.split();

    let smsg = "The quick brown fox jumps over the lazy dog";

    let pjh = thread::spawn(move || {
        println!("-> sending message: '{}'", smsg);

        let zero = [0];
        let mut bytes = smsg.as_bytes().chain(&zero[..]);
        loop {
            if prod.is_full() {
                println!("-> buffer is full, waiting");
                thread::sleep(Duration::from_millis(1));
            } else {
                let n = prod.read_from(&mut bytes, None).unwrap();
                if n == 0 {
                    break;
                }
                println!("-> {} bytes sent", n);
            }
        }

        println!("-> message sent");
    });

    let cjh = thread::spawn(move || {
        println!("<- receiving message");

        let mut bytes = Vec::<u8>::new();
        loop {
            if cons.is_empty() {
                if bytes.ends_with(&[0]) {
                    break;
                } else {
                    println!("<- buffer is empty, waiting");
                    thread::sleep(Duration::from_millis(1));
                }
            } else {
                let n = cons.write_into(&mut bytes, None).unwrap();
                println!("<- {} bytes received", n);
            }
        }

        assert_eq!(bytes.pop().unwrap(), 0);
        let msg = String::from_utf8(bytes).unwrap();
        println!("<- message received: '{}'", msg);

        msg
    });

    pjh.join().unwrap();
    let rmsg = cjh.join().unwrap();

    assert_eq!(smsg, rmsg);
}
source

pub fn is_full(&self) -> bool

Checks if the ring buffer is full.

source

pub fn len(&self) -> usize

The number of items stored in the buffer.

Actual number may be greater than the returned value because of concurring producer activity.

source

pub fn free_len(&self) -> usize

The number of remaining free places in the buffer.

Actual number may be less than the returned value because of concurring producer activity.

source

pub unsafe fn as_uninit_slices(&self) -> (&[MaybeUninit<T>], &[MaybeUninit<T>])

Provides a direct access to the ring buffer occupied memory. The difference from Self::as_slices is that this method provides slices of MaybeUninit<T>, so items may be moved out of slices.

Returns a pair of slices of stored items, the second one may be empty. Elements with lower indices in slice are older. First slice contains older items that second one.

Safety

All items are initialized. Elements must be removed starting from the beginning of first slice. When all items are removed from the first slice then items must be removed from the beginning of the second slice.

This method must be followed by Self::advance call with the number of items being removed previously as argument. No other mutating calls allowed before that.

source

pub unsafe fn as_mut_uninit_slices( &self ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])

Provides a direct mutable access to the ring buffer occupied memory.

Same as Self::as_uninit_slices.

Safety

See Self::as_uninit_slices.

source

pub unsafe fn advance(&mut self, count: usize)

Moves head target by count places.

Safety

First count items in occupied memory must be moved out or dropped.

source

pub fn as_slices(&self) -> (&[T], &[T])

Returns a pair of slices which contain, in order, the contents of the ring buffer.

source

pub fn as_mut_slices(&mut self) -> (&mut [T], &mut [T])

Returns a pair of mutable slices which contain, in order, the contents of the ring buffer.

source

pub fn pop(&mut self) -> Option<T>

Removes latest item from the ring buffer and returns it.

Returns None if the ring buffer is empty.

Examples found in repository?
examples/static.rs (line 13)
5
6
7
8
9
10
11
12
13
14
15
fn main() {
    const RB_SIZE: usize = 1;
    let mut rb = StaticRb::<i32, RB_SIZE>::default();
    let (mut prod, mut cons) = rb.split_ref();

    assert_eq!(prod.push(123), Ok(()));
    assert_eq!(prod.push(321), Err(321));

    assert_eq!(cons.pop(), Some(123));
    assert_eq!(cons.pop(), None);
}
More examples
Hide additional examples
examples/simple.rs (line 11)
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fn main() {
    let rb = HeapRb::<i32>::new(2);
    let (mut prod, mut cons) = rb.split();

    prod.push(0).unwrap();
    prod.push(1).unwrap();
    assert_eq!(prod.push(2), Err(2));

    assert_eq!(cons.pop().unwrap(), 0);

    prod.push(2).unwrap();

    assert_eq!(cons.pop().unwrap(), 1);
    assert_eq!(cons.pop().unwrap(), 2);
    assert_eq!(cons.pop(), None);
}
source

pub fn pop_iter(&mut self) -> PopIterator<'_, T, R>

Returns an iterator that removes items one by one from the ring buffer.

Iterator provides only items that are available for consumer at the moment of pop_iter call, it will not contain new items added after it was created.

Information about removed items is commited to the buffer only when iterator is destroyed.

source

pub fn iter(&self) -> impl Iterator<Item = &T> + '_

Returns a front-to-back iterator containing references to items in the ring buffer.

This iterator does not remove items out of the ring buffer.

source

pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut T> + '_

Returns a front-to-back iterator that returns mutable references to items in the ring buffer.

This iterator does not remove items out of the ring buffer.

source

pub fn skip(&mut self, count: usize) -> usize

Removes at most n and at least min(n, Self::len()) items from the buffer and safely drops them.

If there is no concurring producer activity then exactly min(n, Self::len()) items are removed.

Returns the number of deleted items.

let target = HeapRb::<i32>::new(8);
let (mut prod, mut cons) = target.split();

assert_eq!(prod.push_iter(&mut (0..8)), 8);

assert_eq!(cons.skip(4), 4);
assert_eq!(cons.skip(8), 4);
assert_eq!(cons.skip(8), 0);
source

pub fn clear(&mut self) -> usize

Removes all items from the buffer and safely drops them.

Returns the number of deleted items.

source§

impl<T: Copy, R: RbRef> Consumer<T, R>where R::Rb: RbRead<T>,

source

pub fn pop_slice(&mut self, elems: &mut [T]) -> usize

Removes first items from the ring buffer and writes them into a slice. Elements must be Copy.

Returns count of items been removed from the ring buffer.

source§

impl<T, R: RbRef> Consumer<T, RbWrap<RbReadCache<T, R>>>where R::Rb: RbRead<T>,

source

pub unsafe fn new_postponed(target: R) -> Self

Create new postponed consumer.

Safety

There must be only one consumer containing the same ring buffer reference.

source

pub fn sync(&mut self)

Synchronize changes with the ring buffer.

Postponed consumer requires manual synchronization to make freed space visible for the producer.

source

pub fn into_immediate(self) -> Consumer<T, R>

Synchronize and transform back to immediate consumer.

source§

impl<R: RbRef> Consumer<u8, R>where R::Rb: RbRead<u8>,

source

pub fn write_into<P: Write>( &mut self, writer: &mut P, count: Option<usize> ) -> Result<usize>

Removes at most first count bytes from the ring buffer and writes them into a Write instance. If count is None then as much as possible bytes will be written.

Returns Ok(n) if write succeeded. n is number of bytes been written. n == 0 means that either write returned zero or ring buffer is empty.

If write is failed then original error is returned. In this case it is guaranteed that no items was written to the writer. To achieve this we write only one contiguous slice at once. So this call may write less than len items even if the writer is ready to get more.

Examples found in repository?
examples/message.rs (line 45)
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    let buf = HeapRb::<u8>::new(10);
    let (mut prod, mut cons) = buf.split();

    let smsg = "The quick brown fox jumps over the lazy dog";

    let pjh = thread::spawn(move || {
        println!("-> sending message: '{}'", smsg);

        let zero = [0];
        let mut bytes = smsg.as_bytes().chain(&zero[..]);
        loop {
            if prod.is_full() {
                println!("-> buffer is full, waiting");
                thread::sleep(Duration::from_millis(1));
            } else {
                let n = prod.read_from(&mut bytes, None).unwrap();
                if n == 0 {
                    break;
                }
                println!("-> {} bytes sent", n);
            }
        }

        println!("-> message sent");
    });

    let cjh = thread::spawn(move || {
        println!("<- receiving message");

        let mut bytes = Vec::<u8>::new();
        loop {
            if cons.is_empty() {
                if bytes.ends_with(&[0]) {
                    break;
                } else {
                    println!("<- buffer is empty, waiting");
                    thread::sleep(Duration::from_millis(1));
                }
            } else {
                let n = cons.write_into(&mut bytes, None).unwrap();
                println!("<- {} bytes received", n);
            }
        }

        assert_eq!(bytes.pop().unwrap(), 0);
        let msg = String::from_utf8(bytes).unwrap();
        println!("<- message received: '{}'", msg);

        msg
    });

    pjh.join().unwrap();
    let rmsg = cjh.join().unwrap();

    assert_eq!(smsg, rmsg);
}

Trait Implementations§

source§

impl<R: RbRef> Read for Consumer<u8, R>where R::Rb: RbRead<u8>,

source§

fn read(&mut self, buffer: &mut [u8]) -> Result<usize>

Pull some bytes from this source into the specified buffer, returning how many bytes were read. Read more
1.36.0 · source§

fn read_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result<usize, Error>

Like read, except that it reads into a slice of buffers. Read more
source§

fn is_read_vectored(&self) -> bool

🔬This is a nightly-only experimental API. (can_vector)
Determines if this Reader has an efficient read_vectored implementation. Read more
1.0.0 · source§

fn read_to_end(&mut self, buf: &mut Vec<u8, Global>) -> Result<usize, Error>

Read all bytes until EOF in this source, placing them into buf. Read more
1.0.0 · source§

fn read_to_string(&mut self, buf: &mut String) -> Result<usize, Error>

Read all bytes until EOF in this source, appending them to buf. Read more
1.6.0 · source§

fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), Error>

Read the exact number of bytes required to fill buf. Read more
source§

fn read_buf(&mut self, buf: BorrowedCursor<'_>) -> Result<(), Error>

🔬This is a nightly-only experimental API. (read_buf)
Pull some bytes from this source into the specified buffer. Read more
source§

fn read_buf_exact(&mut self, cursor: BorrowedCursor<'_>) -> Result<(), Error>

🔬This is a nightly-only experimental API. (read_buf)
Read the exact number of bytes required to fill cursor. Read more
1.0.0 · source§

fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,

Creates a “by reference” adaptor for this instance of Read. Read more
1.0.0 · source§

fn bytes(self) -> Bytes<Self>where Self: Sized,

Transforms this Read instance to an Iterator over its bytes. Read more
1.0.0 · source§

fn chain<R>(self, next: R) -> Chain<Self, R>where R: Read, Self: Sized,

Creates an adapter which will chain this stream with another. Read more
1.0.0 · source§

fn take(self, limit: u64) -> Take<Self>where Self: Sized,

Creates an adapter which will read at most limit bytes from it. Read more

Auto Trait Implementations§

§

impl<T, R> RefUnwindSafe for Consumer<T, R>where R: RefUnwindSafe, T: RefUnwindSafe,

§

impl<T, R> Send for Consumer<T, R>where R: Send, T: Send,

§

impl<T, R> Sync for Consumer<T, R>where R: Sync, T: Sync,

§

impl<T, R> Unpin for Consumer<T, R>where R: Unpin, T: Unpin,

§

impl<T, R> UnwindSafe for Consumer<T, R>where R: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.