chute/
reader.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
use branch_hints::unlikely;
use std::sync::atomic::Ordering;
use crate::block::{BlockArc, BLOCK_SIZE};
use crate::lending_iterator::LendingIterator;

// TODO: next_slice()
// TODO: Clone
/// Queue consumer.
/// 
/// Reader returns `&T` with `&mut self` lifetime. This means you should deal 
/// with message BEFORE consuming the next one. 
/// Because of this, it does not implement [Iterator]. But [ClonedReader] does.
///
/// Constructed by [Queue::reader()].
///
///[Queue::reader()]: crate::spmc::Queue::reader 
/// 
/// # Design choices
/// 
/// The value returned by the reader lives as long as the block where it is stored.
/// From the reader's point of view, we can guarantee that the value remains valid
/// as long as the block does not change. However, in Rust, we cannot make such 
/// granular guarantees. Instead, we guarantee that the value remains valid until the
/// reader is mutated. This means the value is guaranteed to live until the next 
/// read operation, at which point the block may change, and the old block could 
/// be destructed.
pub struct Reader<T>{
    pub(crate) block: BlockArc<T>,
    pub(crate) index: usize,
    pub(crate) len  : usize,
}

impl<T> Reader<T> {
    #[inline]
    pub fn cloned(self) -> ClonedReader<T> {
        ClonedReader{reader: self}
    } 
}

impl<T> LendingIterator for Reader<T>{
    type Item<'a> = &'a T where Self: 'a;

    #[inline]
    fn next(&mut self) -> Option<Self::Item<'_>> {
        if self.index == self.len {
            if unlikely(self.len == BLOCK_SIZE) {
                // fetch next block, release current
                if let Some(next_block) = self.block.try_load_next(Ordering::Acquire) {
                    self.index = 0;
                    self.len   = next_block.len.load(Ordering::Acquire);
                    self.block = next_block;
                    
                    // TODO: Disallow empty blocks?
                    if self.len == 0 {
                        return None;
                    }
                } else {
                    return None;
                }
            } else {
                // Reread len.
                // This is synchronization point. `mem` data should be in 
                // current thread visibility, after `len` atomic load. 
                // In analogue with spin-lock.
                let block_len = self.block.len.load(Ordering::Acquire);
                if self.len == block_len {
                    // nothing changed.
                    return None;
                } 
                self.len = block_len;
            }
        }
        
        unsafe{
            let value = &*self.block.mem().add(self.index);
            self.index += 1;
            Some(value)
        }
    }
}

/// Cloning queue consumer.
/// 
/// Reader that clones `T` upon return. Implements [Iterator].
///
/// Constructed by [Reader::cloned()]. 
pub struct ClonedReader<T>{
    reader: Reader<T>   
}
impl<T: Clone> Iterator for ClonedReader<T> {
    type Item = T;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        self.reader.next().cloned()
    }
}