pub struct Producer<T, R: RbRef>where
R::Rb: RbWrite<T>,{ /* private fields */ }
Expand description
Producer part of ring buffer.
Mode
It can operate in immediate (by default) or postponed mode.
Mode could be switched using Self::postponed
/Self::into_postponed
and Self::into_immediate
methods.
- In immediate mode removed and inserted items are automatically synchronized with the other end.
- In postponed mode synchronization occurs only when
Self::sync
orSelf::into_immediate
is called or whenSelf
is dropped. The reason to use postponed mode is that multiple subsequent operations are performed faster due to less frequent cache synchronization.
Implementations§
source§impl<T, R: RbRef> Producer<T, R>where
R::Rb: RbWrite<T>,
impl<T, R: RbRef> Producer<T, R>where R::Rb: RbWrite<T>,
sourcepub unsafe fn new(target: R) -> Self
pub unsafe fn new(target: R) -> Self
Creates producer from the ring buffer reference.
Safety
There must be only one producer containing the same ring buffer reference.
sourcepub fn into_rb_ref(self) -> R
pub fn into_rb_ref(self) -> R
Consumes self
and returns underlying ring buffer reference.
sourcepub fn postponed(&mut self) -> PostponedProducer<T, &R::Rb>
pub fn postponed(&mut self) -> PostponedProducer<T, &R::Rb>
Returns postponed producer that borrows Self
.
sourcepub fn into_postponed(self) -> PostponedProducer<T, R>
pub fn into_postponed(self) -> PostponedProducer<T, R>
Transforms Self
into postponed producer.
sourcepub fn capacity(&self) -> usize
pub fn capacity(&self) -> usize
Returns capacity of the ring buffer.
The capacity of the buffer is constant.
sourcepub fn is_full(&self) -> bool
pub fn is_full(&self) -> bool
Checks if the ring buffer is full.
The result may become irrelevant at any time because of concurring consumer activity.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
fn main() {
let buf = HeapRb::<u8>::new(10);
let (mut prod, mut cons) = buf.split();
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
}
sourcepub fn len(&self) -> usize
pub fn len(&self) -> usize
The number of items stored in the buffer.
Actual number may be less than the returned value because of concurring consumer activity.
sourcepub fn free_len(&self) -> usize
pub fn free_len(&self) -> usize
The number of remaining free places in the buffer.
Actual number may be greater than the returning value because of concurring consumer activity.
sourcepub unsafe fn free_space_as_slices(
&mut self
) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
pub unsafe fn free_space_as_slices( &mut self ) -> (&mut [MaybeUninit<T>], &mut [MaybeUninit<T>])
Provides a direct access to the ring buffer vacant memory. Returns a pair of slices of uninitialized memory, the second one may be empty.
Safety
Vacant memory is uninitialized. Initialized items must be put starting from the beginning of first slice. When first slice is fully filled then items must be put to the beginning of the second slice.
This method must be followed by Self::advance
call with the number of items being put previously as argument.
No other mutating calls allowed before that.
sourcepub fn push(&mut self, elem: T) -> Result<(), T>
pub fn push(&mut self, elem: T) -> Result<(), T>
Appends an item to the ring buffer.
On failure returns an Err
containing the item that hasn’t been appended.
Examples found in repository?
More examples
3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
fn main() {
let rb = HeapRb::<i32>::new(2);
let (mut prod, mut cons) = rb.split();
prod.push(0).unwrap();
prod.push(1).unwrap();
assert_eq!(prod.push(2), Err(2));
assert_eq!(cons.pop().unwrap(), 0);
prod.push(2).unwrap();
assert_eq!(cons.pop().unwrap(), 1);
assert_eq!(cons.pop().unwrap(), 2);
assert_eq!(cons.pop(), None);
}
sourcepub fn push_iter<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> usize
pub fn push_iter<I: Iterator<Item = T>>(&mut self, iter: &mut I) -> usize
Appends items from an iterator to the ring buffer. Elements that haven’t been added to the ring buffer remain in the iterator.
Returns count of items been appended to the ring buffer.
Inserted items are committed to the ring buffer all at once in the end, e.g. when buffer is full or iterator has ended.
source§impl<T: Copy, R: RbRef> Producer<T, R>where
R::Rb: RbWrite<T>,
impl<T: Copy, R: RbRef> Producer<T, R>where R::Rb: RbWrite<T>,
sourcepub fn push_slice(&mut self, elems: &[T]) -> usize
pub fn push_slice(&mut self, elems: &[T]) -> usize
Appends items from slice to the ring buffer.
Elements must be Copy
.
Returns count of items been appended to the ring buffer.
source§impl<T, R: RbRef> Producer<T, RbWrap<RbWriteCache<T, R>>>where
R::Rb: RbWrite<T>,
impl<T, R: RbRef> Producer<T, RbWrap<RbWriteCache<T, R>>>where R::Rb: RbWrite<T>,
sourcepub unsafe fn new_postponed(target: R) -> Self
pub unsafe fn new_postponed(target: R) -> Self
Create new postponed producer.
Safety
There must be only one producer containing the same ring buffer reference.
sourcepub fn sync(&mut self)
pub fn sync(&mut self)
Synchronize changes with the ring buffer.
Postponed producer requires manual synchronization to make pushed items visible for the consumer.
sourcepub fn into_immediate(self) -> Producer<T, R>
pub fn into_immediate(self) -> Producer<T, R>
Synchronize and transform back to immediate producer.
source§impl<R: RbRef> Producer<u8, R>where
R::Rb: RbWrite<u8>,
impl<R: RbRef> Producer<u8, R>where R::Rb: RbWrite<u8>,
sourcepub fn read_from<P: Read>(
&mut self,
reader: &mut P,
count: Option<usize>
) -> Result<usize>
pub fn read_from<P: Read>( &mut self, reader: &mut P, count: Option<usize> ) -> Result<usize>
Reads at most count
bytes from Read
instance and appends them to the ring buffer.
If count
is None
then as much as possible bytes will be read.
Returns Ok(n)
if read
succeeded. n
is number of bytes been read.
n == 0
means that either read
returned zero or ring buffer is full.
If read
is failed then original error is returned. In this case it is guaranteed that no items was read from the reader.
To achieve this we read only one contiguous slice at once. So this call may read less than remaining
items in the buffer even if the reader is ready to provide more.
Examples found in repository?
5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
fn main() {
let buf = HeapRb::<u8>::new(10);
let (mut prod, mut cons) = buf.split();
let smsg = "The quick brown fox jumps over the lazy dog";
let pjh = thread::spawn(move || {
println!("-> sending message: '{}'", smsg);
let zero = [0];
let mut bytes = smsg.as_bytes().chain(&zero[..]);
loop {
if prod.is_full() {
println!("-> buffer is full, waiting");
thread::sleep(Duration::from_millis(1));
} else {
let n = prod.read_from(&mut bytes, None).unwrap();
if n == 0 {
break;
}
println!("-> {} bytes sent", n);
}
}
println!("-> message sent");
});
let cjh = thread::spawn(move || {
println!("<- receiving message");
let mut bytes = Vec::<u8>::new();
loop {
if cons.is_empty() {
if bytes.ends_with(&[0]) {
break;
} else {
println!("<- buffer is empty, waiting");
thread::sleep(Duration::from_millis(1));
}
} else {
let n = cons.write_into(&mut bytes, None).unwrap();
println!("<- {} bytes received", n);
}
}
assert_eq!(bytes.pop().unwrap(), 0);
let msg = String::from_utf8(bytes).unwrap();
println!("<- message received: '{}'", msg);
msg
});
pjh.join().unwrap();
let rmsg = cjh.join().unwrap();
assert_eq!(smsg, rmsg);
}
Trait Implementations§
source§impl<R: RbRef> Write for Producer<u8, R>where
R::Rb: RbWrite<u8>,
impl<R: RbRef> Write for Producer<u8, R>where R::Rb: RbWrite<u8>,
source§fn write(&mut self, buffer: &[u8]) -> Result<usize>
fn write(&mut self, buffer: &[u8]) -> Result<usize>
source§fn flush(&mut self) -> Result<()>
fn flush(&mut self) -> Result<()>
source§fn is_write_vectored(&self) -> bool
fn is_write_vectored(&self) -> bool
can_vector
)1.0.0 · source§fn write_all(&mut self, buf: &[u8]) -> Result<(), Error>
fn write_all(&mut self, buf: &[u8]) -> Result<(), Error>
source§fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>
fn write_all_vectored(&mut self, bufs: &mut [IoSlice<'_>]) -> Result<(), Error>
write_all_vectored
)