chute/reader.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
use branch_hints::unlikely;
use std::sync::atomic::Ordering;
use crate::block::{BlockArc, BLOCK_SIZE};
use crate::lending_iterator::LendingIterator;
// TODO: next_slice()
// TODO: Clone
/// Queue consumer.
///
/// Reader returns `&T` with `&mut self` lifetime. This means you should deal
/// with message BEFORE consuming the next one.
/// Because of this, it does not implement [Iterator]. But [ClonedReader] does.
///
/// Constructed by [Queue::reader()].
///
///[Queue::reader()]: crate::spmc::Queue::reader
///
/// # Design choices
///
/// The value returned by the reader lives as long as the block where it is stored.
/// From the reader's point of view, we can guarantee that the value remains valid
/// as long as the block does not change. However, in Rust, we cannot make such
/// granular guarantees. Instead, we guarantee that the value remains valid until the
/// reader is mutated. This means the value is guaranteed to live until the next
/// read operation, at which point the block may change, and the old block could
/// be destructed.
pub struct Reader<T>{
pub(crate) block: BlockArc<T>,
pub(crate) index: usize,
pub(crate) len : usize,
}
impl<T> Reader<T> {
#[inline]
pub fn cloned(self) -> ClonedReader<T> {
ClonedReader{reader: self}
}
}
impl<T> LendingIterator for Reader<T>{
type Item<'a> = &'a T where Self: 'a;
#[inline]
fn next(&mut self) -> Option<Self::Item<'_>> {
if self.index == self.len {
if unlikely(self.len == BLOCK_SIZE) {
// fetch next block, release current
if let Some(next_block) = self.block.try_load_next(Ordering::Acquire) {
self.index = 0;
self.len = next_block.len.load(Ordering::Acquire);
self.block = next_block;
// TODO: Disallow empty blocks?
if self.len == 0 {
return None;
}
} else {
return None;
}
} else {
// Reread len.
// This is synchronization point. `mem` data should be in
// current thread visibility, after `len` atomic load.
// In analogue with spin-lock.
let block_len = self.block.len.load(Ordering::Acquire);
if self.len == block_len {
// nothing changed.
return None;
}
self.len = block_len;
}
}
unsafe{
let value = &*self.block.mem().add(self.index);
self.index += 1;
Some(value)
}
}
}
/// Cloning queue consumer.
///
/// Reader that clones `T` upon return. Implements [Iterator].
///
/// Constructed by [Reader::cloned()].
pub struct ClonedReader<T>{
reader: Reader<T>
}
impl<T: Clone> Iterator for ClonedReader<T> {
type Item = T;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
self.reader.next().cloned()
}
}