differential_dataflow/operators/
mod.rs1pub use self::reduce::{Reduce, Threshold, Count};
8pub use self::iterate::Iterate;
9pub use self::join::{Join, JoinCore};
10pub use self::count::CountTotal;
11pub use self::threshold::ThresholdTotal;
12
13pub mod arrange;
14pub mod reduce;
15pub mod consolidate;
16pub mod iterate;
17pub mod join;
18pub mod count;
19pub mod threshold;
20
21use crate::difference::Semigroup;
22use crate::lattice::Lattice;
23use crate::trace::Cursor;
24
25struct EditList<'a, C: Cursor> where C::Time: Sized, C::Diff: Sized {
27 values: Vec<(C::Val<'a>, usize)>,
28 edits: Vec<(C::Time, C::Diff)>,
29}
30
31impl<'a, C: Cursor> EditList<'a, C> where C::Time: Ord+Clone, C::Diff: Semigroup {
32 #[inline]
34 fn new() -> Self {
35 EditList {
36 values: Vec::new(),
37 edits: Vec::new(),
38 }
39 }
40 fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
42 where
43 L: Fn(&C::Time)->C::Time,
44 {
45 self.clear();
46 while cursor.val_valid(storage) {
47 cursor.map_times(storage, |time1, diff1| self.push(logic(time1), diff1.clone()));
48 self.seal(cursor.val(storage));
49 cursor.step_val(storage);
50 }
51 }
52 #[inline]
54 fn clear(&mut self) {
55 self.values.clear();
56 self.edits.clear();
57 }
58 fn len(&self) -> usize { self.edits.len() }
59 #[inline]
61 fn push(&mut self, time: C::Time, diff: C::Diff) {
62 self.edits.push((time, diff));
64 }
65 #[inline]
67 fn seal(&mut self, value: C::Val<'a>) {
68 let prev = self.values.last().map(|x| x.1).unwrap_or(0);
69 crate::consolidation::consolidate_from(&mut self.edits, prev);
70 if self.edits.len() > prev {
71 self.values.push((value, self.edits.len()));
72 }
73 }
74 fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
75 for index in 0 .. self.values.len() {
76 let lower = if index == 0 { 0 } else { self.values[index-1].1 };
77 let upper = self.values[index].1;
78 for edit in lower .. upper {
79 logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
80 }
81 }
82 }
83}
84
85struct ValueHistory<'storage, C: Cursor> where C::Time: Sized, C::Diff: Sized {
86
87 edits: EditList<'storage, C>,
88 history: Vec<(C::Time, C::Time, usize, usize)>, buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, }
91
92impl<'storage, C: Cursor> ValueHistory<'storage, C>
93where
94 C::Time: Lattice+Ord+Clone,
95 C::Diff: Semigroup,
96{
97 fn new() -> Self {
98 ValueHistory {
99 edits: EditList::new(),
100 history: Vec::new(),
101 buffer: Vec::new(),
102 }
103 }
104 fn clear(&mut self) {
105 self.edits.clear();
106 self.history.clear();
107 self.buffer.clear();
108 }
109 fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
110 where
111 L: Fn(&C::Time)->C::Time,
112 {
113 self.edits.load(cursor, storage, logic);
114 }
115
116 fn replay_key<'history, L>(
120 &'history mut self,
121 cursor: &mut C,
122 storage: &'storage C::Storage,
123 key: C::Key<'storage>,
124 logic: L
125 ) -> HistoryReplay<'storage, 'history, C>
126 where
127 L: Fn(&C::Time)->C::Time,
128 {
129 self.clear();
130 cursor.seek_key(storage, key);
131 if cursor.get_key(storage) == Some(key) {
132 self.load(cursor, storage, logic);
133 }
134 self.replay()
135 }
136
137 fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
139
140 self.buffer.clear();
141 self.history.clear();
142 for value_index in 0 .. self.edits.values.len() {
143 let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
144 let upper = self.edits.values[value_index].1;
145 for edit_index in lower .. upper {
146 let time = self.edits.edits[edit_index].0.clone();
147 self.history.push((time.clone(), time, value_index, edit_index));
148 }
149 }
150
151 self.history.sort_by(|x,y| y.cmp(x));
152 for index in 1 .. self.history.len() {
153 self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
154 }
155
156 HistoryReplay {
157 replay: self
158 }
159 }
160}
161
162struct HistoryReplay<'storage, 'history, C>
163where
164 'storage: 'history,
165 C: Cursor,
166 C::Time: Lattice+Ord+Clone+'history,
167 C::Diff: Semigroup+'history,
168{
169 replay: &'history mut ValueHistory<'storage, C>
170}
171
172impl<'storage, 'history, C> HistoryReplay<'storage, 'history, C>
173where
174 'storage: 'history,
175 C: Cursor,
176 C::Time: Lattice+Ord+Clone+'history,
177 C::Diff: Semigroup+'history,
178{
179 fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
180 fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
181 fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
182 self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
183 }
184
185 fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
186 &self.replay.buffer[..]
187 }
188
189 fn step(&mut self) {
190 let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
191 self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
192 }
193 fn step_while_time_is(&mut self, time: &C::Time) -> bool {
194 let mut found = false;
195 while self.time() == Some(time) {
196 found = true;
197 self.step();
198 }
199 found
200 }
201 fn advance_buffer_by(&mut self, meet: &C::Time) {
202 for element in self.replay.buffer.iter_mut() {
203 (element.0).1 = (element.0).1.join(meet);
204 }
205 crate::consolidation::consolidate(&mut self.replay.buffer);
206 }
207 fn is_done(&self) -> bool { self.replay.history.is_empty() }
208}