parasol_db/view/
composite.rs

1use crate::{Seq, View};
2
3#[derive(Clone)]
4pub struct CompositeView<V: View> {
5    views: Vec<V>,
6    vector_clock: Vec<Seq>,
7}
8
9impl<V: View> CompositeView<V> {
10    pub fn new(views: Vec<V>) -> Self {
11        let vector_clock = vec![0; views.len()];
12        Self { views, vector_clock }
13    }
14
15    pub fn vector_clock_update(&mut self, node_id: usize, seq: Seq) {
16        self.vector_clock[node_id] = seq;
17    }
18
19    pub fn views_mut(&mut self) -> &mut Vec<V> {
20        &mut self.views
21    }
22}
23
24impl<V: View> View for CompositeView<V>
25where
26    for<'a> V::Iterator: Clone,
27{
28    type Event = V::Event;
29    type Iterator = CompositeViewIterator<V>;
30
31    fn scan(&mut self, start: Seq, end: Seq) -> Self::Iterator {
32        CompositeViewIterator::new(self, start, end)
33    }
34
35    fn get_current_seq(&mut self) -> Seq {
36        // current seq for the purposes of reading is the minimum of sequences in the vector clock.
37        // the entry for a vector clock is only updated by a transmission from that node, which is a promise not to
38        // assign lower sequence numbers to writes, so that the events before the minimum sequence number are immutable
39        self.vector_clock.iter().min().copied().unwrap_or_default()
40    }
41}
42
43pub struct CompositeViewIterator<V: View> {
44    iterators: Vec<V::Iterator>,
45}
46
47impl<'iter, V: View> CompositeViewIterator<V>
48where
49    V::Iterator: Clone,
50{
51    fn new(view: &'iter mut CompositeView<V>, start: Seq, end: Seq) -> Self {
52        // iterate each constituent view
53        Self {
54            iterators: view
55                .views
56                .iter_mut()
57                .map(|view| view.scan(start, end))
58                .collect(),
59        }
60    }
61}
62
63impl<V: View> Iterator for CompositeViewIterator<V>
64where
65    V::Iterator: Clone,
66{
67    type Item = (Seq, V::Event);
68
69    fn next(&mut self) -> Option<Self::Item> {
70        let min_seq_idx = {
71            // clone iterators
72            let mut iterators = self.iterators.iter().cloned().collect::<Vec<_>>();
73
74            // which iterator has the next event with the lowest sequence number?
75            let mut min_seq = Seq::MAX;
76            let mut min_seq_idx = None;
77            for (idx, iter) in iterators.iter_mut().enumerate() {
78                if let Some((seq, _)) = iter.next() {
79                    // if there are multiple, prefer the lowest node index (break ties by node id)
80                    if seq < min_seq {
81                        min_seq = seq;
82                        min_seq_idx = Some(idx);
83                    }
84                }
85            }
86
87            min_seq_idx
88        };
89
90        // advance the iterator with the lowest sequence number and return the result if there is one
91        min_seq_idx.and_then(|idx| self.iterators[idx].next())
92    }
93}
94
95impl<V: View> DoubleEndedIterator for CompositeViewIterator<V>
96where
97    V::Iterator: Clone,
98{
99    fn next_back(&mut self) -> Option<Self::Item> {
100        let max_seq_idx = {
101            // clone iterators
102            let mut iterators = self.iterators.iter().cloned().collect::<Vec<_>>();
103
104            // which iterator has the next event with the highest sequence number?
105            let mut max_seq = Seq::MIN;
106            let mut max_seq_idx = None;
107            for (idx, iter) in iterators.iter_mut().enumerate() {
108                if let Some((seq, _)) = iter.next_back() {
109                    // if there are multiple, prefer the highest node index (break ties by node id)
110                    if seq >= max_seq {
111                        max_seq = seq;
112                        max_seq_idx = Some(idx);
113                    }
114                }
115            }
116
117            max_seq_idx
118        };
119
120        // advance the iterator with the highest sequence number and return the result if there is one
121        max_seq_idx.and_then(|idx| self.iterators[idx].next_back())
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::CompositeView;
128    use crate::table::vec::VecTable;
129    use crate::{Seq, Table, View};
130
131    #[test]
132    fn scan_none() {
133        let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
134        assert_eq!(composite.get_current_seq(), 0);
135        assert_eq!(
136            composite
137                .scan(Seq::MIN, Seq::MAX)
138                .map(|(_, event)| event)
139                .collect::<Vec<i32>>(),
140            Vec::<i32>::new()
141        );
142    }
143
144    #[test]
145    fn scan_one() {
146        let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
147
148        composite.views[0].append([12]);
149
150        assert_eq!(composite.get_current_seq(), 0);
151        assert_eq!(
152            composite
153                .scan(Seq::MIN, Seq::MAX)
154                .map(|(_, event)| event)
155                .collect::<Vec<i32>>(),
156            vec![12]
157        );
158    }
159
160    #[test]
161    fn scan_multiple_one_node() {
162        let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
163
164        composite.views[0].append([12, 34, 56]);
165
166        assert_eq!(composite.get_current_seq(), 0);
167        assert_eq!(
168            composite
169                .scan(Seq::MIN, Seq::MAX)
170                .map(|(_, event)| event)
171                .collect::<Vec<i32>>(),
172            vec![12, 34, 56]
173        );
174    }
175
176    #[test]
177    fn scan_multiple_multiple_nodes() {
178        let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
179
180        composite.views[0].append([12]);
181        composite.views[1].append([34]);
182        composite.views[2].append([56]);
183
184        assert_eq!(composite.get_current_seq(), 0);
185        assert_eq!(
186            composite
187                .scan(Seq::MIN, Seq::MAX)
188                .map(|(_, event)| event)
189                .collect::<Vec<i32>>(),
190            vec![12, 34, 56]
191        );
192    }
193
194    #[test]
195    fn scan_multiple_each_multiple_nodes() {
196        let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
197
198        composite.views[0].append([12, 56]);
199        composite.views[1].append([34, 90]);
200        composite.views[2].append([78]);
201
202        assert_eq!(composite.get_current_seq(), 0);
203        assert_eq!(
204            composite
205                .scan(Seq::MIN, Seq::MAX)
206                .map(|(_, event)| event)
207                .collect::<Vec<i32>>(),
208            vec![12, 34, 78, 56, 90] // ordered by (seq, node) pair
209        );
210    }
211
212    #[test]
213    fn scan_multiple_each_multiple_nodes_sparse_seqs() {
214        let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
215
216        // unrealistic/heavy-handed way to specify all sequence numbers
217        composite.views[0].set_current_seq(0);
218        composite.views[0].append([12]);
219        composite.views[1].set_current_seq(1);
220        composite.views[1].append([34]);
221        composite.views[0].set_current_seq(2);
222        composite.views[0].append([56]);
223        composite.views[2].set_current_seq(3);
224        composite.views[2].append([78]);
225        composite.views[1].set_current_seq(4);
226        composite.views[1].append([90]);
227
228        assert_eq!(composite.get_current_seq(), 0);
229        assert_eq!(
230            composite
231                .scan(Seq::MIN, Seq::MAX)
232                .map(|(_, event)| event)
233                .collect::<Vec<i32>>(),
234            vec![12, 34, 56, 78, 90] // nodes don't matter in this case because seqs are unique
235        );
236    }
237}