parasol_db/view/
composite.rs1use crate::{Seq, View};
2
3#[derive(Clone)]
4pub struct CompositeView<V: View> {
5 views: Vec<V>,
6 vector_clock: Vec<Seq>,
7}
8
9impl<V: View> CompositeView<V> {
10 pub fn new(views: Vec<V>) -> Self {
11 let vector_clock = vec![0; views.len()];
12 Self { views, vector_clock }
13 }
14
15 pub fn vector_clock_update(&mut self, node_id: usize, seq: Seq) {
16 self.vector_clock[node_id] = seq;
17 }
18
19 pub fn views_mut(&mut self) -> &mut Vec<V> {
20 &mut self.views
21 }
22}
23
24impl<V: View> View for CompositeView<V>
25where
26 for<'a> V::Iterator: Clone,
27{
28 type Event = V::Event;
29 type Iterator = CompositeViewIterator<V>;
30
31 fn scan(&mut self, start: Seq, end: Seq) -> Self::Iterator {
32 CompositeViewIterator::new(self, start, end)
33 }
34
35 fn get_current_seq(&mut self) -> Seq {
36 self.vector_clock.iter().min().copied().unwrap_or_default()
40 }
41}
42
43pub struct CompositeViewIterator<V: View> {
44 iterators: Vec<V::Iterator>,
45}
46
47impl<'iter, V: View> CompositeViewIterator<V>
48where
49 V::Iterator: Clone,
50{
51 fn new(view: &'iter mut CompositeView<V>, start: Seq, end: Seq) -> Self {
52 Self {
54 iterators: view
55 .views
56 .iter_mut()
57 .map(|view| view.scan(start, end))
58 .collect(),
59 }
60 }
61}
62
63impl<V: View> Iterator for CompositeViewIterator<V>
64where
65 V::Iterator: Clone,
66{
67 type Item = (Seq, V::Event);
68
69 fn next(&mut self) -> Option<Self::Item> {
70 let min_seq_idx = {
71 let mut iterators = self.iterators.iter().cloned().collect::<Vec<_>>();
73
74 let mut min_seq = Seq::MAX;
76 let mut min_seq_idx = None;
77 for (idx, iter) in iterators.iter_mut().enumerate() {
78 if let Some((seq, _)) = iter.next() {
79 if seq < min_seq {
81 min_seq = seq;
82 min_seq_idx = Some(idx);
83 }
84 }
85 }
86
87 min_seq_idx
88 };
89
90 min_seq_idx.and_then(|idx| self.iterators[idx].next())
92 }
93}
94
95impl<V: View> DoubleEndedIterator for CompositeViewIterator<V>
96where
97 V::Iterator: Clone,
98{
99 fn next_back(&mut self) -> Option<Self::Item> {
100 let max_seq_idx = {
101 let mut iterators = self.iterators.iter().cloned().collect::<Vec<_>>();
103
104 let mut max_seq = Seq::MIN;
106 let mut max_seq_idx = None;
107 for (idx, iter) in iterators.iter_mut().enumerate() {
108 if let Some((seq, _)) = iter.next_back() {
109 if seq >= max_seq {
111 max_seq = seq;
112 max_seq_idx = Some(idx);
113 }
114 }
115 }
116
117 max_seq_idx
118 };
119
120 max_seq_idx.and_then(|idx| self.iterators[idx].next_back())
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::CompositeView;
128 use crate::table::vec::VecTable;
129 use crate::{Seq, Table, View};
130
131 #[test]
132 fn scan_none() {
133 let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
134 assert_eq!(composite.get_current_seq(), 0);
135 assert_eq!(
136 composite
137 .scan(Seq::MIN, Seq::MAX)
138 .map(|(_, event)| event)
139 .collect::<Vec<i32>>(),
140 Vec::<i32>::new()
141 );
142 }
143
144 #[test]
145 fn scan_one() {
146 let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
147
148 composite.views[0].append([12]);
149
150 assert_eq!(composite.get_current_seq(), 0);
151 assert_eq!(
152 composite
153 .scan(Seq::MIN, Seq::MAX)
154 .map(|(_, event)| event)
155 .collect::<Vec<i32>>(),
156 vec![12]
157 );
158 }
159
160 #[test]
161 fn scan_multiple_one_node() {
162 let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
163
164 composite.views[0].append([12, 34, 56]);
165
166 assert_eq!(composite.get_current_seq(), 0);
167 assert_eq!(
168 composite
169 .scan(Seq::MIN, Seq::MAX)
170 .map(|(_, event)| event)
171 .collect::<Vec<i32>>(),
172 vec![12, 34, 56]
173 );
174 }
175
176 #[test]
177 fn scan_multiple_multiple_nodes() {
178 let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
179
180 composite.views[0].append([12]);
181 composite.views[1].append([34]);
182 composite.views[2].append([56]);
183
184 assert_eq!(composite.get_current_seq(), 0);
185 assert_eq!(
186 composite
187 .scan(Seq::MIN, Seq::MAX)
188 .map(|(_, event)| event)
189 .collect::<Vec<i32>>(),
190 vec![12, 34, 56]
191 );
192 }
193
194 #[test]
195 fn scan_multiple_each_multiple_nodes() {
196 let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
197
198 composite.views[0].append([12, 56]);
199 composite.views[1].append([34, 90]);
200 composite.views[2].append([78]);
201
202 assert_eq!(composite.get_current_seq(), 0);
203 assert_eq!(
204 composite
205 .scan(Seq::MIN, Seq::MAX)
206 .map(|(_, event)| event)
207 .collect::<Vec<i32>>(),
208 vec![12, 34, 78, 56, 90] );
210 }
211
212 #[test]
213 fn scan_multiple_each_multiple_nodes_sparse_seqs() {
214 let mut composite = CompositeView::<VecTable<i32>>::new(vec![VecTable::new(); 5]);
215
216 composite.views[0].set_current_seq(0);
218 composite.views[0].append([12]);
219 composite.views[1].set_current_seq(1);
220 composite.views[1].append([34]);
221 composite.views[0].set_current_seq(2);
222 composite.views[0].append([56]);
223 composite.views[2].set_current_seq(3);
224 composite.views[2].append([78]);
225 composite.views[1].set_current_seq(4);
226 composite.views[1].append([90]);
227
228 assert_eq!(composite.get_current_seq(), 0);
229 assert_eq!(
230 composite
231 .scan(Seq::MIN, Seq::MAX)
232 .map(|(_, event)| event)
233 .collect::<Vec<i32>>(),
234 vec![12, 34, 56, 78, 90] );
236 }
237}