Skip to main content

differential_dataflow/operators/
mod.rs

1//! Specialize differential dataflow operators.
2//!
3//! Differential dataflow introduces a small number of specialized operators on collections. These
4//! operators have specialized implementations to make them work efficiently, and are in addition
5//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
6
7pub use self::iterate::Iterate;
8pub use self::count::CountTotal;
9pub use self::threshold::ThresholdTotal;
10
11pub mod arrange;
12pub mod reduce;
13pub mod iterate;
14pub mod join;
15pub mod count;
16pub mod threshold;
17
18use crate::lattice::Lattice;
19use crate::trace::Cursor;
20
21/// An accumulation of (value, time, diff) updates.
22struct EditList<'a, C: Cursor> {
23    values: Vec<(C::Val<'a>, usize)>,
24    edits: Vec<(C::Time, C::Diff)>,
25}
26
27impl<'a, C: Cursor> EditList<'a, C> {
28    /// Creates an empty list of edits.
29    #[inline]
30    fn new() -> Self {
31        EditList {
32            values: Vec::new(),
33            edits: Vec::new(),
34        }
35    }
36    /// Loads the contents of a cursor.
37    fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
38    where
39        L: Fn(C::TimeGat<'_>)->C::Time,
40    {
41        self.clear();
42        while let Some(val) = cursor.get_val(storage) {
43            cursor.map_times(storage, |time1, diff1| self.push(logic(time1), C::owned_diff(diff1)));
44            self.seal(val);
45            cursor.step_val(storage);
46        }
47    }
48    /// Clears the list of edits.
49    #[inline]
50    fn clear(&mut self) {
51        self.values.clear();
52        self.edits.clear();
53    }
54    fn len(&self) -> usize { self.edits.len() }
55    /// Inserts a new edit for an as-yet undetermined value.
56    #[inline]
57    fn push(&mut self, time: C::Time, diff: C::Diff) {
58        // TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible.
59        self.edits.push((time, diff));
60    }
61    /// Associates all edits pushed since the previous `seal_value` call with `value`.
62    #[inline]
63    fn seal(&mut self, value: C::Val<'a>) {
64        let prev = self.values.last().map(|x| x.1).unwrap_or(0);
65        crate::consolidation::consolidate_from(&mut self.edits, prev);
66        if self.edits.len() > prev {
67            self.values.push((value, self.edits.len()));
68        }
69    }
70    fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
71        for index in 0 .. self.values.len() {
72            let lower = if index == 0 { 0 } else { self.values[index-1].1 };
73            let upper = self.values[index].1;
74            for edit in lower .. upper {
75                logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
76            }
77        }
78    }
79}
80
81struct ValueHistory<'storage, C: Cursor> {
82    edits: EditList<'storage, C>,
83    history: Vec<(C::Time, C::Time, usize, usize)>,     // (time, meet, value_index, edit_offset)
84    buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>,   // where we accumulate / collapse updates.
85}
86
87impl<'storage, C: Cursor> ValueHistory<'storage, C> {
88    fn new() -> Self {
89        ValueHistory {
90            edits: EditList::new(),
91            history: Vec::new(),
92            buffer: Vec::new(),
93        }
94    }
95    fn clear(&mut self) {
96        self.edits.clear();
97        self.history.clear();
98        self.buffer.clear();
99    }
100    fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
101    where
102        L: Fn(C::TimeGat<'_>)->C::Time,
103    {
104        self.edits.load(cursor, storage, logic);
105    }
106
107    /// Loads and replays a specified key.
108    ///
109    /// If the key is absent, the replayed history will be empty.
110    fn replay_key<'history, L>(
111        &'history mut self,
112        cursor: &mut C,
113        storage: &'storage C::Storage,
114        key: C::Key<'storage>,
115        logic: L
116    ) -> HistoryReplay<'storage, 'history, C>
117    where
118        L: Fn(C::TimeGat<'_>)->C::Time,
119    {
120        self.clear();
121        cursor.seek_key(storage, key);
122        if cursor.get_key(storage) == Some(key) {
123            self.load(cursor, storage, logic);
124        }
125        self.replay()
126    }
127
128    /// Organizes history based on current contents of edits.
129    fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
130
131        self.buffer.clear();
132        self.history.clear();
133        for value_index in 0 .. self.edits.values.len() {
134            let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
135            let upper = self.edits.values[value_index].1;
136            for edit_index in lower .. upper {
137                let time = self.edits.edits[edit_index].0.clone();
138                self.history.push((time.clone(), time, value_index, edit_index));
139            }
140        }
141
142        self.history.sort_by(|x,y| y.cmp(x));
143        for index in 1 .. self.history.len() {
144            self.history[index].1 = self.history[index].1.meet(&self.history[index-1].1);
145        }
146
147        HistoryReplay {
148            replay: self
149        }
150    }
151}
152
153struct HistoryReplay<'storage, 'history, C: Cursor> {
154    replay: &'history mut ValueHistory<'storage, C>
155}
156
157impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
158    fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
159    fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
160    fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
161        self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
162    }
163
164    fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
165        &self.replay.buffer[..]
166    }
167
168    fn step(&mut self) {
169        let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
170        self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
171    }
172    fn step_while_time_is(&mut self, time: &C::Time) -> bool {
173        let mut found = false;
174        while self.time() == Some(time) {
175            found = true;
176            self.step();
177        }
178        found
179    }
180    fn advance_buffer_by(&mut self, meet: &C::Time) {
181        for element in self.replay.buffer.iter_mut() {
182            (element.0).1 = (element.0).1.join(meet);
183        }
184        crate::consolidation::consolidate(&mut self.replay.buffer);
185    }
186    fn is_done(&self) -> bool { self.replay.history.is_empty() }
187}