differential_dataflow/operators/
mod.rs1pub use self::iterate::Iterate;
8pub use self::count::CountTotal;
9pub use self::threshold::ThresholdTotal;
10
11pub mod arrange;
12pub mod reduce;
13pub mod iterate;
14pub mod join;
15pub mod count;
16pub mod threshold;
17
18use crate::lattice::Lattice;
19use crate::trace::Cursor;
20
21struct EditList<'a, C: Cursor> {
23 values: Vec<(C::Val<'a>, usize)>,
24 edits: Vec<(C::Time, C::Diff)>,
25}
26
27impl<'a, C: Cursor> EditList<'a, C> {
28 #[inline]
30 fn new() -> Self {
31 EditList {
32 values: Vec::new(),
33 edits: Vec::new(),
34 }
35 }
36 fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
38 where
39 L: Fn(C::TimeGat<'_>)->C::Time,
40 {
41 self.clear();
42 while let Some(val) = cursor.get_val(storage) {
43 cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1)));
44 self.seal(val);
45 cursor.step_val(storage);
46 }
47 }
48 #[inline]
50 fn clear(&mut self) {
51 self.values.clear();
52 self.edits.clear();
53 }
54 fn len(&self) -> usize { self.edits.len() }
55 #[inline]
57 fn push(&mut self, time: C::Time, diff: C::Diff) {
58 self.edits.push((time, diff));
60 }
61 #[inline]
63 fn seal(&mut self, value: C::Val<'a>) {
64 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
65 crate::consolidation::consolidate_from(&mut self.edits, prev);
66 if self.edits.len() > prev {
67 self.values.push((value, self.edits.len()));
68 }
69 }
70 fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
71 for index in 0 .. self.values.len() {
72 let lower = if index == 0 { 0 } else { self.values[index-1].1 };
73 let upper = self.values[index].1;
74 for edit in lower .. upper {
75 logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
76 }
77 }
78 }
79}
80
81struct ValueHistory<'storage, C: Cursor> {
82 edits: EditList<'storage, C>,
83 history: Vec<(C::Time, C::Time, usize, usize)>, buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, }
86
87impl<'storage, C: Cursor> ValueHistory<'storage, C> {
88 fn new() -> Self {
89 ValueHistory {
90 edits: EditList::new(),
91 history: Vec::new(),
92 buffer: Vec::new(),
93 }
94 }
95 fn clear(&mut self) {
96 self.edits.clear();
97 self.history.clear();
98 self.buffer.clear();
99 }
100 fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
101 where
102 L: Fn(C::TimeGat<'_>)->C::Time,
103 {
104 self.edits.load(cursor, storage, logic);
105 }
106
107 fn replay_key<'history, L>(
111 &'history mut self,
112 cursor: &mut C,
113 storage: &'storage C::Storage,
114 key: C::Key<'storage>,
115 logic: L
116 ) -> HistoryReplay<'storage, 'history, C>
117 where
118 L: Fn(C::TimeGat<'_>)->C::Time,
119 {
120 self.clear();
121 cursor.seek_key(storage, key);
122 if cursor.get_key(storage) == Some(key) {
123 self.load(cursor, storage, logic);
124 }
125 self.replay()
126 }
127
128 fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
130
131 self.buffer.clear();
132 self.history.clear();
133 for value_index in 0 .. self.edits.values.len() {
134 let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
135 let upper = self.edits.values[value_index].1;
136 for edit_index in lower .. upper {
137 let time = self.edits.edits[edit_index].0.clone();
138 self.history.push((time.clone(), time, value_index, edit_index));
139 }
140 }
141
142 self.history.sort_by(|x,y| y.cmp(x));
143 for index in 1 .. self.history.len() {
144 self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
145 }
146
147 HistoryReplay {
148 replay: self
149 }
150 }
151}
152
153struct HistoryReplay<'storage, 'history, C: Cursor> {
154 replay: &'history mut ValueHistory<'storage, C>
155}
156
157impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
158 fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
159 fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
160 fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
161 self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
162 }
163
164 fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
165 &self.replay.buffer[..]
166 }
167
168 fn step(&mut self) {
169 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
170 self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
171 }
172 fn step_while_time_is(&mut self, time: &C::Time) -> bool {
173 let mut found = false;
174 while self.time() == Some(time) {
175 found = true;
176 self.step();
177 }
178 found
179 }
180 fn advance_buffer_by(&mut self, meet: &C::Time) {
181 for element in self.replay.buffer.iter_mut() {
182 (element.0).1 = (element.0).1.join(meet);
183 }
184 crate::consolidation::consolidate(&mut self.replay.buffer);
185 }
186 fn is_done(&self) -> bool { self.replay.history.is_empty() }
187}