palimpsest_dataflow/operators/
mod.rs1pub use self::count::CountTotal;
8pub use self::iterate::Iterate;
9pub use self::join::{Join, JoinCore};
10pub use self::reduce::{Count, Reduce, Threshold};
11pub use self::threshold::ThresholdTotal;
12
13pub mod arrange;
14pub mod consolidate;
15pub mod count;
16pub mod iterate;
17pub mod join;
18pub mod reduce;
19pub mod threshold;
20
21use crate::lattice::Lattice;
22use crate::trace::Cursor;
23
24struct EditList<'a, C: Cursor> {
26 values: Vec<(C::Val<'a>, usize)>,
27 edits: Vec<(C::Time, C::Diff)>,
28}
29
30impl<'a, C: Cursor> EditList<'a, C> {
31 #[inline]
33 fn new() -> Self {
34 EditList {
35 values: Vec::new(),
36 edits: Vec::new(),
37 }
38 }
39 fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
41 where
42 L: Fn(C::TimeGat<'_>) -> C::Time,
43 {
44 self.clear();
45 while let Some(val) = cursor.get_val(storage) {
46 cursor.map_times(storage, |time1, diff1| {
47 self.push(logic(time1), C::owned_diff(diff1))
48 });
49 self.seal(val);
50 cursor.step_val(storage);
51 }
52 }
53 #[inline]
55 fn clear(&mut self) {
56 self.values.clear();
57 self.edits.clear();
58 }
59 fn len(&self) -> usize {
60 self.edits.len()
61 }
62 #[inline]
64 fn push(&mut self, time: C::Time, diff: C::Diff) {
65 self.edits.push((time, diff));
67 }
68 #[inline]
70 fn seal(&mut self, value: C::Val<'a>) {
71 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
72 crate::consolidation::consolidate_from(&mut self.edits, prev);
73 if self.edits.len() > prev {
74 self.values.push((value, self.edits.len()));
75 }
76 }
77 fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
78 for index in 0..self.values.len() {
79 let lower = if index == 0 {
80 0
81 } else {
82 self.values[index - 1].1
83 };
84 let upper = self.values[index].1;
85 for edit in lower..upper {
86 logic(
87 self.values[index].0,
88 &self.edits[edit].0,
89 &self.edits[edit].1,
90 );
91 }
92 }
93 }
94}
95
96struct ValueHistory<'storage, C: Cursor> {
97 edits: EditList<'storage, C>,
98 history: Vec<(C::Time, C::Time, usize, usize)>, buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, }
101
102impl<'storage, C: Cursor> ValueHistory<'storage, C> {
103 fn new() -> Self {
104 ValueHistory {
105 edits: EditList::new(),
106 history: Vec::new(),
107 buffer: Vec::new(),
108 }
109 }
110 fn clear(&mut self) {
111 self.edits.clear();
112 self.history.clear();
113 self.buffer.clear();
114 }
115 fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
116 where
117 L: Fn(C::TimeGat<'_>) -> C::Time,
118 {
119 self.edits.load(cursor, storage, logic);
120 }
121
122 fn replay_key<'history, L>(
126 &'history mut self,
127 cursor: &mut C,
128 storage: &'storage C::Storage,
129 key: C::Key<'storage>,
130 logic: L,
131 ) -> HistoryReplay<'storage, 'history, C>
132 where
133 L: Fn(C::TimeGat<'_>) -> C::Time,
134 {
135 self.clear();
136 cursor.seek_key(storage, key);
137 if cursor.get_key(storage) == Some(key) {
138 self.load(cursor, storage, logic);
139 }
140 self.replay()
141 }
142
143 fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
145 self.buffer.clear();
146 self.history.clear();
147 for value_index in 0..self.edits.values.len() {
148 let lower = if value_index > 0 {
149 self.edits.values[value_index - 1].1
150 } else {
151 0
152 };
153 let upper = self.edits.values[value_index].1;
154 for edit_index in lower..upper {
155 let time = self.edits.edits[edit_index].0.clone();
156 self.history
157 .push((time.clone(), time, value_index, edit_index));
158 }
159 }
160
161 self.history.sort_by(|x, y| y.cmp(x));
162 for index in 1..self.history.len() {
163 self.history[index].1 = self.history[index].1.meet(&self.history[index - 1].1);
164 }
165
166 HistoryReplay { replay: self }
167 }
168}
169
170struct HistoryReplay<'storage, 'history, C: Cursor> {
171 replay: &'history mut ValueHistory<'storage, C>,
172}
173
174impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
175 fn time(&self) -> Option<&C::Time> {
176 self.replay.history.last().map(|x| &x.0)
177 }
178 fn meet(&self) -> Option<&C::Time> {
179 self.replay.history.last().map(|x| &x.1)
180 }
181 fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
182 self.replay.history.last().map(|&(ref t, _, v, e)| {
183 (
184 self.replay.edits.values[v].0,
185 t,
186 &self.replay.edits.edits[e].1,
187 )
188 })
189 }
190
191 fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
192 &self.replay.buffer[..]
193 }
194
195 fn step(&mut self) {
196 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
197 self.replay.buffer.push((
198 (self.replay.edits.values[value_index].0, time),
199 self.replay.edits.edits[edit_offset].1.clone(),
200 ));
201 }
202 fn step_while_time_is(&mut self, time: &C::Time) -> bool {
203 let mut found = false;
204 while self.time() == Some(time) {
205 found = true;
206 self.step();
207 }
208 found
209 }
210 fn advance_buffer_by(&mut self, meet: &C::Time) {
211 for element in self.replay.buffer.iter_mut() {
212 (element.0).1 = (element.0).1.join(meet);
213 }
214 crate::consolidation::consolidate(&mut self.replay.buffer);
215 }
216 fn is_done(&self) -> bool {
217 self.replay.history.is_empty()
218 }
219}