Skip to main content

palimpsest_dataflow/operators/
mod.rs

1//! Specialize differential dataflow operators.
2//!
3//! Differential dataflow introduces a small number of specialized operators on collections. These
4//! operators have specialized implementations to make them work efficiently, and are in addition
5//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
6
7pub use self::count::CountTotal;
8pub use self::iterate::Iterate;
9pub use self::join::{Join, JoinCore};
10pub use self::reduce::{Count, Reduce, Threshold};
11pub use self::threshold::ThresholdTotal;
12
13pub mod arrange;
14pub mod consolidate;
15pub mod count;
16pub mod iterate;
17pub mod join;
18pub mod reduce;
19pub mod threshold;
20
21use crate::lattice::Lattice;
22use crate::trace::Cursor;
23
24/// An accumulation of (value, time, diff) updates.
25struct EditList<'a, C: Cursor> {
26    values: Vec<(C::Val<'a>, usize)>,
27    edits: Vec<(C::Time, C::Diff)>,
28}
29
30impl<'a, C: Cursor> EditList<'a, C> {
31    /// Creates an empty list of edits.
32    #[inline]
33    fn new() -> Self {
34        EditList {
35            values: Vec::new(),
36            edits: Vec::new(),
37        }
38    }
39    /// Loads the contents of a cursor.
40    fn load<L>(&mut self, cursor: &mut C, storage: &'a C::Storage, logic: L)
41    where
42        L: Fn(C::TimeGat<'_>) -> C::Time,
43    {
44        self.clear();
45        while let Some(val) = cursor.get_val(storage) {
46            cursor.map_times(storage, |time1, diff1| {
47                self.push(logic(time1), C::owned_diff(diff1))
48            });
49            self.seal(val);
50            cursor.step_val(storage);
51        }
52    }
53    /// Clears the list of edits.
54    #[inline]
55    fn clear(&mut self) {
56        self.values.clear();
57        self.edits.clear();
58    }
59    fn len(&self) -> usize {
60        self.edits.len()
61    }
62    /// Inserts a new edit for an as-yet undetermined value.
63    #[inline]
64    fn push(&mut self, time: C::Time, diff: C::Diff) {
65        // TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible.
66        self.edits.push((time, diff));
67    }
68    /// Associates all edits pushed since the previous `seal_value` call with `value`.
69    #[inline]
70    fn seal(&mut self, value: C::Val<'a>) {
71        let prev = self.values.last().map(|x| x.1).unwrap_or(0);
72        crate::consolidation::consolidate_from(&mut self.edits, prev);
73        if self.edits.len() > prev {
74            self.values.push((value, self.edits.len()));
75        }
76    }
77    fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
78        for index in 0..self.values.len() {
79            let lower = if index == 0 {
80                0
81            } else {
82                self.values[index - 1].1
83            };
84            let upper = self.values[index].1;
85            for edit in lower..upper {
86                logic(
87                    self.values[index].0,
88                    &self.edits[edit].0,
89                    &self.edits[edit].1,
90                );
91            }
92        }
93    }
94}
95
96struct ValueHistory<'storage, C: Cursor> {
97    edits: EditList<'storage, C>,
98    history: Vec<(C::Time, C::Time, usize, usize)>, // (time, meet, value_index, edit_offset)
99    buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, // where we accumulate / collapse updates.
100}
101
102impl<'storage, C: Cursor> ValueHistory<'storage, C> {
103    fn new() -> Self {
104        ValueHistory {
105            edits: EditList::new(),
106            history: Vec::new(),
107            buffer: Vec::new(),
108        }
109    }
110    fn clear(&mut self) {
111        self.edits.clear();
112        self.history.clear();
113        self.buffer.clear();
114    }
115    fn load<L>(&mut self, cursor: &mut C, storage: &'storage C::Storage, logic: L)
116    where
117        L: Fn(C::TimeGat<'_>) -> C::Time,
118    {
119        self.edits.load(cursor, storage, logic);
120    }
121
122    /// Loads and replays a specified key.
123    ///
124    /// If the key is absent, the replayed history will be empty.
125    fn replay_key<'history, L>(
126        &'history mut self,
127        cursor: &mut C,
128        storage: &'storage C::Storage,
129        key: C::Key<'storage>,
130        logic: L,
131    ) -> HistoryReplay<'storage, 'history, C>
132    where
133        L: Fn(C::TimeGat<'_>) -> C::Time,
134    {
135        self.clear();
136        cursor.seek_key(storage, key);
137        if cursor.get_key(storage) == Some(key) {
138            self.load(cursor, storage, logic);
139        }
140        self.replay()
141    }
142
143    /// Organizes history based on current contents of edits.
144    fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
145        self.buffer.clear();
146        self.history.clear();
147        for value_index in 0..self.edits.values.len() {
148            let lower = if value_index > 0 {
149                self.edits.values[value_index - 1].1
150            } else {
151                0
152            };
153            let upper = self.edits.values[value_index].1;
154            for edit_index in lower..upper {
155                let time = self.edits.edits[edit_index].0.clone();
156                self.history
157                    .push((time.clone(), time, value_index, edit_index));
158            }
159        }
160
161        self.history.sort_by(|x, y| y.cmp(x));
162        for index in 1..self.history.len() {
163            self.history[index].1 = self.history[index].1.meet(&self.history[index - 1].1);
164        }
165
166        HistoryReplay { replay: self }
167    }
168}
169
170struct HistoryReplay<'storage, 'history, C: Cursor> {
171    replay: &'history mut ValueHistory<'storage, C>,
172}
173
174impl<'storage, 'history, C: Cursor> HistoryReplay<'storage, 'history, C> {
175    fn time(&self) -> Option<&C::Time> {
176        self.replay.history.last().map(|x| &x.0)
177    }
178    fn meet(&self) -> Option<&C::Time> {
179        self.replay.history.last().map(|x| &x.1)
180    }
181    fn edit(&self) -> Option<(C::Val<'storage>, &C::Time, &C::Diff)> {
182        self.replay.history.last().map(|&(ref t, _, v, e)| {
183            (
184                self.replay.edits.values[v].0,
185                t,
186                &self.replay.edits.edits[e].1,
187            )
188        })
189    }
190
191    fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
192        &self.replay.buffer[..]
193    }
194
195    fn step(&mut self) {
196        let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
197        self.replay.buffer.push((
198            (self.replay.edits.values[value_index].0, time),
199            self.replay.edits.edits[edit_offset].1.clone(),
200        ));
201    }
202    fn step_while_time_is(&mut self, time: &C::Time) -> bool {
203        let mut found = false;
204        while self.time() == Some(time) {
205            found = true;
206            self.step();
207        }
208        found
209    }
210    fn advance_buffer_by(&mut self, meet: &C::Time) {
211        for element in self.replay.buffer.iter_mut() {
212            (element.0).1 = (element.0).1.join(meet);
213        }
214        crate::consolidation::consolidate(&mut self.replay.buffer);
215    }
216    fn is_done(&self) -> bool {
217        self.replay.history.is_empty()
218    }
219}