Struct ringbuf::producer::Producer

source ·
pub struct Producer<T, R: RbRef>where
    R::Rb: RbWrite<T>,{ /* private fields */ }
Expand description

Producer part of ring buffer.

Mode

It can operate in immediate (by default) or postponed mode. Mode could be switched using Self::postponed/Self::into_postponed and Self::into_immediate methods.

  • In immediate mode removed and inserted items are automatically synchronized with the other end.
  • In postponed mode synchronization occurs only when Self::sync or Self::into_immediate is called or when Self is dropped. The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.

Implementations§

source§

impl<T, R: RbRef> Producer<T, R>where R::Rb: RbWrite<T>,

source

pub unsafe fn new(target: R) -> Self

Creates producer from the ring buffer reference.

Safety

There must be only one producer containing the same ring buffer reference.

source

pub fn rb(&self) -> &R::Rb

Returns reference to the underlying ring buffer.

source

pub fn into_rb_ref(self) -> R

Consumes self and returns underlying ring buffer reference.

source

pub fn postponed(&mut self) -> PostponedProducer<T, &R::Rb>

Returns postponed producer that borrows Self.

source

pub fn into_postponed(self) -> PostponedProducer<T, R>

Transforms Self into postponed producer.

source

pub fn capacity(&self) -> usize

Returns capacity of the ring buffer.

The capacity of the buffer is constant.

source

pub fn is_empty(&self) -> bool

Checks if the ring buffer is empty.

source

pub fn is_full(&self) -> bool

Checks if the ring buffer is full.

The result may become irrelevant at any time because of concurring consumer activity.

Examples found in repository?
examples/message.rs (line 17)
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    let buf = HeapRb::<u8>::new(10);
    let (mut prod, mut cons) = buf.split();

    let smsg = "The quick brown fox jumps over the lazy dog";

    let pjh = thread::spawn(move || {
        println!("-> sending message: '{}'", smsg);

        let zero = [0];
        let mut bytes = smsg.as_bytes().chain(&zero[..]);
        loop {
            if prod.is_full() {
                println!("-> buffer is full, waiting");
                thread::sleep(Duration::from_millis(1));
            } else {
                let n = prod.read_from(&mut bytes, None).unwrap();
                if n == 0 {
                    break;
                }
                println!("-> {} bytes sent", n);
            }
        }

        println!("-> message sent");
    });

    let cjh = thread::spawn(move || {
        println!("<- receiving message");

        let mut bytes = Vec::<u8>::new();
        loop {
            if cons.is_empty() {
                if bytes.ends_with(&[0]) {
                    break;
                } else {
                    println!("<- buffer is empty, waiting");
                    thread::sleep(Duration::from_millis(1));
                }
            } else {
                let n = cons.write_into(&mut bytes, None).unwrap();
                println!("<- {} bytes received", n);
            }
        }

        assert_eq!(bytes.pop().unwrap(), 0);
        let msg = String::from_utf8(bytes).unwrap();
        println!("<- message received: '{}'", msg);

        msg
    });

    pjh.join().unwrap();
    let rmsg = cjh.join().unwrap();

    assert_eq!(smsg, rmsg);
}
source

pub fn len(&self) -> usize

The number of items stored in the buffer.

Actual number may be less than the returned value because of concurring consumer activity.

source

pub fn free_len(&self) -> usize

The number of remaining free places in the buffer.

Actual number may be greater than the returning value because of concurring consumer activity.

source

pub unsafe fn free_space_as_slices( &mut self ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])

Provides a direct access to the ring buffer vacant memory. Returns a pair of slices of uninitialized memory, the second one may be empty.

Safety

Vacant memory is uninitialized. Initialized items must be put starting from the beginning of first slice. When first slice is fully filled then items must be put to the beginning of the second slice.

This method must be followed by Self::advance call with the number of items being put previously as argument. No other mutating calls allowed before that.

source

pub unsafe fn advance(&mut self, count: usize)

Moves tail counter by count places.

Safety

First count items in free space must be initialized.

source

pub fn push(&mut self, elem: T) -> Result<(), T>

Appends an item to the ring buffer.

On failure returns an Err containing the item that hasn’t been appended.

Examples found in repository?
examples/static.rs (line 10)
5
6
7
8
9
10
11
12
13
14
15
fn main() {
    const RB_SIZE: usize = 1;
    let mut rb = StaticRb::<i32, RB_SIZE>::default();
    let (mut prod, mut cons) = rb.split_ref();

    assert_eq!(prod.push(123), Ok(()));
    assert_eq!(prod.push(321), Err(321));

    assert_eq!(cons.pop(), Some(123));
    assert_eq!(cons.pop(), None);
}
More examples
Hide additional examples
examples/simple.rs (line 7)
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
fn main() {
    let rb = HeapRb::<i32>::new(2);
    let (mut prod, mut cons) = rb.split();

    prod.push(0).unwrap();
    prod.push(1).unwrap();
    assert_eq!(prod.push(2), Err(2));

    assert_eq!(cons.pop().unwrap(), 0);

    prod.push(2).unwrap();

    assert_eq!(cons.pop().unwrap(), 1);
    assert_eq!(cons.pop().unwrap(), 2);
    assert_eq!(cons.pop(), None);
}
source

pub fn push_iter<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> usize

Appends items from an iterator to the ring buffer. Elements that haven’t been added to the ring buffer remain in the iterator.

Returns count of items been appended to the ring buffer.

Inserted items are committed to the ring buffer all at once in the end, e.g. when buffer is full or iterator has ended.

source§

impl<T: Copy, R: RbRef> Producer<T, R>where R::Rb: RbWrite<T>,

source

pub fn push_slice(&mut self, elems: &[T]) -> usize

Appends items from slice to the ring buffer. Elements must be Copy.

Returns count of items been appended to the ring buffer.

source§

impl<T, R: RbRef> Producer<T, RbWrap<RbWriteCache<T, R>>>where R::Rb: RbWrite<T>,

source

pub unsafe fn new_postponed(target: R) -> Self

Create new postponed producer.

Safety

There must be only one producer containing the same ring buffer reference.

source

pub fn sync(&mut self)

Synchronize changes with the ring buffer.

Postponed producer requires manual synchronization to make pushed items visible for the consumer.

source

pub fn discard(&mut self)

Don’t publish and drop items inserted since last synchronization.

source

pub fn into_immediate(self) -> Producer<T, R>

Synchronize and transform back to immediate producer.

source§

impl<R: RbRef> Producer<u8, R>where R::Rb: RbWrite<u8>,

source

pub fn read_from<P: Read>( &mut self, reader: &mut P, count: Option<usize> ) -> Result<usize>

Reads at most count bytes from Read instance and appends them to the ring buffer. If count is None then as much as possible bytes will be read.

Returns Ok(n) if read succeeded. n is number of bytes been read. n == 0 means that either read returned zero or ring buffer is full.

If read is failed then original error is returned. In this case it is guaranteed that no items was read from the reader. To achieve this we read only one contiguous slice at once. So this call may read less than remaining items in the buffer even if the reader is ready to provide more.

Examples found in repository?
examples/message.rs (line 21)
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
fn main() {
    let buf = HeapRb::<u8>::new(10);
    let (mut prod, mut cons) = buf.split();

    let smsg = "The quick brown fox jumps over the lazy dog";

    let pjh = thread::spawn(move || {
        println!("-> sending message: '{}'", smsg);

        let zero = [0];
        let mut bytes = smsg.as_bytes().chain(&zero[..]);
        loop {
            if prod.is_full() {
                println!("-> buffer is full, waiting");
                thread::sleep(Duration::from_millis(1));
            } else {
                let n = prod.read_from(&mut bytes, None).unwrap();
                if n == 0 {
                    break;
                }
                println!("-> {} bytes sent", n);
            }
        }

        println!("-> message sent");
    });

    let cjh = thread::spawn(move || {
        println!("<- receiving message");

        let mut bytes = Vec::<u8>::new();
        loop {
            if cons.is_empty() {
                if bytes.ends_with(&[0]) {
                    break;
                } else {
                    println!("<- buffer is empty, waiting");
                    thread::sleep(Duration::from_millis(1));
                }
            } else {
                let n = cons.write_into(&mut bytes, None).unwrap();
                println!("<- {} bytes received", n);
            }
        }

        assert_eq!(bytes.pop().unwrap(), 0);
        let msg = String::from_utf8(bytes).unwrap();
        println!("<- message received: '{}'", msg);

        msg
    });

    pjh.join().unwrap();
    let rmsg = cjh.join().unwrap();

    assert_eq!(smsg, rmsg);
}

Trait Implementations§

source§

impl<R: RbRef> Write for Producer<u8, R>where R::Rb: RbWrite<u8>,

source§

fn write_str(&mut self, s: &str) -> Result

Writes a string slice into this writer, returning whether the write succeeded. Read more
1.1.0 · source§

fn write_char(&mut self, c: char) -> Result<(), Error>

Writes a char into this writer, returning whether the write succeeded. Read more
1.0.0 · source§

fn write_fmt(&mut self, args: Arguments<'_>) -> Result<(), Error>

Glue for usage of the write! macro with implementors of this trait. Read more
source§

impl<R: RbRef> Write for Producer<u8, R>where R::Rb: RbWrite<u8>,

source§

fn write(&mut self, buffer: &[u8]) -> Result<usize>

Write a buffer into this writer, returning how many bytes were written. Read more
source§

fn flush(&mut self) -> Result<()>

Flush this output stream, ensuring that all intermediately buffered contents reach their destination. Read more
1.36.0 · source§

fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result<usize, Error>

Like write, except that it writes from a slice of buffers. Read more
source§

fn is_write_vectored(&self) -> bool

🔬This is a nightly-only experimental API. (can_vector)
Determines if this Writer has an efficient write_vectored implementation. Read more
1.0.0 · source§

fn write_all(&mut self, buf: &[u8]) -> Result<(), Error>

Attempts to write an entire buffer into this writer. Read more
source§

fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>

🔬This is a nightly-only experimental API. (write_all_vectored)
Attempts to write multiple buffers into this writer. Read more
1.0.0 · source§

fn write_fmt(&mut self, fmt: Arguments<'_>) -> Result<(), Error>

Writes a formatted string into this writer, returning any error encountered. Read more
1.0.0 · source§

fn by_ref(&mut self) -> &mut Selfwhere Self: Sized,

Creates a “by reference” adapter for this instance of Write. Read more

Auto Trait Implementations§

§

impl<T, R> RefUnwindSafe for Producer<T, R>where R: RefUnwindSafe, T: RefUnwindSafe,

§

impl<T, R> Send for Producer<T, R>where R: Send, T: Send,

§

impl<T, R> Sync for Producer<T, R>where R: Sync, T: Sync,

§

impl<T, R> Unpin for Producer<T, R>where R: Unpin, T: Unpin,

§

impl<T, R> UnwindSafe for Producer<T, R>where R: UnwindSafe, T: UnwindSafe,

Blanket Implementations§

source§

impl<T> Any for Twhere T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T, U> Into<U> for Twhere U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T, U> TryFrom<U> for Twhere U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.