differential_dataflow/operators/
mod.rs1pub use self::iterate::Iterate;
8pub use self::count::CountTotal;
9pub use self::threshold::ThresholdTotal;
10
11pub mod arrange;
12pub mod reduce;
13pub mod iterate;
14pub mod join;
15pub mod count;
16pub mod threshold;
17
18use crate::lattice::Lattice;
19use crate::trace::Cursor;
20
21pub struct EditList<V, T, D> {
23 values: Vec<(V, usize)>,
24 edits: Vec<(T, D)>,
25}
26
27impl<V: Copy, T: Ord + Lattice, D: crate::difference::Semigroup> EditList<V, T, D> {
28 #[inline]
30 fn new() -> Self {
31 EditList {
32 values: Vec::new(),
33 edits: Vec::new(),
34 }
35 }
36 fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: Option<&T>)
43 where
44 C: Cursor<Val<'a> = V, Time = T, Diff = D>,
45 {
46 self.clear();
47 while let Some(val) = cursor.get_val(storage) {
48 cursor.map_times(storage, |time, diff| {
49 let mut t = C::owned_time(time);
50 if let Some(m) = meet { t.join_assign(m); }
51 self.push(t, C::owned_diff(diff));
52 });
53 self.seal(val);
54 cursor.step_val(storage);
55 }
56 }
57 #[inline]
59 pub fn clear(&mut self) {
60 self.values.clear();
61 self.edits.clear();
62 }
63 fn len(&self) -> usize { self.edits.len() }
64 #[inline]
66 pub fn push(&mut self, time: T, diff: D) {
67 self.edits.push((time, diff));
69 }
70 #[inline]
72 pub fn seal(&mut self, value: V) {
73 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
74 crate::consolidation::consolidate_from(&mut self.edits, prev);
75 if self.edits.len() > prev {
76 self.values.push((value, self.edits.len()));
77 }
78 }
79 fn map<F: FnMut(V, &T, &D)>(&self, mut logic: F) {
80 for index in 0 .. self.values.len() {
81 let lower = if index == 0 { 0 } else { self.values[index-1].1 };
82 let upper = self.values[index].1;
83 for edit in lower .. upper {
84 logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
85 }
86 }
87 }
88}
89
90struct ValueHistory<V, T, D> {
91 edits: EditList<V, T, D>,
92 history: Vec<(T, T, usize, usize)>, buffer: Vec<((V, T), D)>, }
95
96impl<V: Copy + Ord, T: Ord + Clone + Lattice, D: crate::difference::Semigroup> ValueHistory<V, T, D> {
97 fn new() -> Self {
98 ValueHistory {
99 edits: EditList::new(),
100 history: Vec::new(),
101 buffer: Vec::new(),
102 }
103 }
104 fn clear(&mut self) {
105 self.edits.clear();
106 self.history.clear();
107 self.buffer.clear();
108 }
109
110 fn replay_key<'a, 'history, C>(
114 &'history mut self,
115 cursor: &mut C,
116 storage: &'a C::Storage,
117 key: C::Key<'a>,
118 meet: Option<&T>,
119 ) -> HistoryReplay<'history, V, T, D>
120 where
121 C: Cursor<Val<'a> = V, Time = T, Diff = D>,
122 {
123 self.clear();
124 cursor.populate_key(storage, key, meet, &mut self.edits);
125 self.replay()
126 }
127
128 fn replay<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> {
130
131 self.buffer.clear();
132 self.history.clear();
133 for value_index in 0 .. self.edits.values.len() {
134 let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
135 let upper = self.edits.values[value_index].1;
136 for edit_index in lower .. upper {
137 let time = self.edits.edits[edit_index].0.clone();
138 self.history.push((time.clone(), time, value_index, edit_index));
139 }
140 }
141
142 self.history.sort_by(|x,y| y.cmp(x));
143 self.history.iter_mut().reduce(|prev, cur| { cur.1.meet_assign(&prev.1); cur });
144
145 HistoryReplay { replay: self }
146 }
147}
148
149struct HistoryReplay<'history, V, T, D> {
150 replay: &'history mut ValueHistory<V, T, D>,
151}
152
153impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::difference::Semigroup> HistoryReplay<'history, V, T, D> {
154 fn time(&self) -> Option<&T> { self.replay.history.last().map(|x| &x.0) }
155 fn meet(&self) -> Option<&T> { self.replay.history.last().map(|x| &x.1) }
156 fn edit(&self) -> Option<(V, &T, &D)> {
157 self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
158 }
159
160 fn buffer(&self) -> &[((V, T), D)] {
161 &self.replay.buffer[..]
162 }
163
164 fn step(&mut self) {
165 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
166 self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
167 }
168 fn step_while_time_is(&mut self, time: &T) -> bool {
169 let mut found = false;
170 while self.time() == Some(time) {
171 found = true;
172 self.step();
173 }
174 found
175 }
176 fn advance_buffer_by(&mut self, meet: &T) {
177 for element in self.replay.buffer.iter_mut() {
178 (element.0).1.join_assign(meet);
179 }
180 crate::consolidation::consolidate(&mut self.replay.buffer);
181 }
182 fn is_done(&self) -> bool { self.replay.history.is_empty() }
183}