Skip to main content

differential_dataflow/operators/
mod.rs

1//! Specialize differential dataflow operators.
2//!
3//! Differential dataflow introduces a small number of specialized operators on collections. These
4//! operators have specialized implementations to make them work efficiently, and are in addition
5//! to several operations defined directly on the `Collection` type (e.g. `map` and `filter`).
6
7pub use self::iterate::Iterate;
8pub use self::count::CountTotal;
9pub use self::threshold::ThresholdTotal;
10
11pub mod arrange;
12pub mod reduce;
13pub mod iterate;
14pub mod join;
15pub mod count;
16pub mod threshold;
17
18use crate::lattice::Lattice;
19use crate::trace::Cursor;
20
21/// An accumulation of (value, time, diff) updates.
22pub struct EditList<V, T, D> {
23    values: Vec<(V, usize)>,
24    edits: Vec<(T, D)>,
25}
26
27impl<V: Copy, T: Ord + Lattice, D: crate::difference::Semigroup> EditList<V, T, D> {
28    /// Creates an empty list of edits.
29    #[inline]
30    fn new() -> Self {
31        EditList {
32            values: Vec::new(),
33            edits: Vec::new(),
34        }
35    }
36    /// Walks the cursor's vals at the current key into `self`, advancing times by `meet` if supplied.
37    ///
38    /// The cursor is assumed to be positioned at a key already; callers that need
39    /// to seek should use [`Cursor::populate_key`] (or [`ValueHistory::replay_key`])
40    /// instead. This split avoids a redundant seek in the merge-join inner loop,
41    /// where the cursor is positioned by the upstream merge step.
42    fn load<'a, C>(&mut self, cursor: &mut C, storage: &'a C::Storage, meet: Option<&T>)
43    where
44        C: Cursor<Val<'a> = V, Time = T, Diff = D>,
45    {
46        self.clear();
47        while let Some(val) = cursor.get_val(storage) {
48            cursor.map_times(storage, |time, diff| {
49                let mut t = C::owned_time(time);
50                if let Some(m) = meet { t.join_assign(m); }
51                self.push(t, C::owned_diff(diff));
52            });
53            self.seal(val);
54            cursor.step_val(storage);
55        }
56    }
57    /// Clears the list of edits.
58    #[inline]
59    pub fn clear(&mut self) {
60        self.values.clear();
61        self.edits.clear();
62    }
63    fn len(&self) -> usize { self.edits.len() }
64    /// Inserts a new edit for an as-yet undetermined value.
65    #[inline]
66    pub fn push(&mut self, time: T, diff: D) {
67        // TODO: Could attempt "insertion-sort" like behavior here, where we collapse if possible.
68        self.edits.push((time, diff));
69    }
70    /// Associates all edits pushed since the previous `seal_value` call with `value`.
71    #[inline]
72    pub fn seal(&mut self, value: V) {
73        let prev = self.values.last().map(|x| x.1).unwrap_or(0);
74        crate::consolidation::consolidate_from(&mut self.edits, prev);
75        if self.edits.len() > prev {
76            self.values.push((value, self.edits.len()));
77        }
78    }
79    fn map<F: FnMut(V, &T, &D)>(&self, mut logic: F) {
80        for index in 0 .. self.values.len() {
81            let lower = if index == 0 { 0 } else { self.values[index-1].1 };
82            let upper = self.values[index].1;
83            for edit in lower .. upper {
84                logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
85            }
86        }
87    }
88}
89
90struct ValueHistory<V, T, D> {
91    edits: EditList<V, T, D>,
92    history: Vec<(T, T, usize, usize)>,     // (time, meet, value_index, edit_offset)
93    buffer: Vec<((V, T), D)>,               // where we accumulate / collapse updates.
94}
95
96impl<V: Copy + Ord, T: Ord + Clone + Lattice, D: crate::difference::Semigroup> ValueHistory<V, T, D> {
97    fn new() -> Self {
98        ValueHistory {
99            edits: EditList::new(),
100            history: Vec::new(),
101            buffer: Vec::new(),
102        }
103    }
104    fn clear(&mut self) {
105        self.edits.clear();
106        self.history.clear();
107        self.buffer.clear();
108    }
109
110    /// Loads and replays a specified key.
111    ///
112    /// If the key is absent, the replayed history will be empty.
113    fn replay_key<'a, 'history, C>(
114        &'history mut self,
115        cursor: &mut C,
116        storage: &'a C::Storage,
117        key: C::Key<'a>,
118        meet: Option<&T>,
119    ) -> HistoryReplay<'history, V, T, D>
120    where
121        C: Cursor<Val<'a> = V, Time = T, Diff = D>,
122    {
123        self.clear();
124        cursor.populate_key(storage, key, meet, &mut self.edits);
125        self.replay()
126    }
127
128    /// Organizes history based on current contents of edits.
129    fn replay<'history>(&'history mut self) -> HistoryReplay<'history, V, T, D> {
130
131        self.buffer.clear();
132        self.history.clear();
133        for value_index in 0 .. self.edits.values.len() {
134            let lower = if value_index > 0 { self.edits.values[value_index-1].1 } else { 0 };
135            let upper = self.edits.values[value_index].1;
136            for edit_index in lower .. upper {
137                let time = self.edits.edits[edit_index].0.clone();
138                self.history.push((time.clone(), time, value_index, edit_index));
139            }
140        }
141
142        self.history.sort_by(|x,y| y.cmp(x));
143        self.history.iter_mut().reduce(|prev, cur| { cur.1.meet_assign(&prev.1); cur });
144
145        HistoryReplay { replay: self }
146    }
147}
148
149struct HistoryReplay<'history, V, T, D> {
150    replay: &'history mut ValueHistory<V, T, D>,
151}
152
153impl<'history, V: Copy + Ord, T: Ord + Clone + Lattice, D: Clone + crate::difference::Semigroup> HistoryReplay<'history, V, T, D> {
154    fn time(&self) -> Option<&T> { self.replay.history.last().map(|x| &x.0) }
155    fn meet(&self) -> Option<&T> { self.replay.history.last().map(|x| &x.1) }
156    fn edit(&self) -> Option<(V, &T, &D)> {
157        self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
158    }
159
160    fn buffer(&self) -> &[((V, T), D)] {
161        &self.replay.buffer[..]
162    }
163
164    fn step(&mut self) {
165        let (time, _, value_index, edit_offset) = self.replay.history.pop().unwrap();
166        self.replay.buffer.push(((self.replay.edits.values[value_index].0, time), self.replay.edits.edits[edit_offset].1.clone()));
167    }
168    fn step_while_time_is(&mut self, time: &T) -> bool {
169        let mut found = false;
170        while self.time() == Some(time) {
171            found = true;
172            self.step();
173        }
174        found
175    }
176    fn advance_buffer_by(&mut self, meet: &T) {
177        for element in self.replay.buffer.iter_mut() {
178            (element.0).1.join_assign(meet);
179        }
180        crate::consolidation::consolidate(&mut self.replay.buffer);
181    }
182    fn is_done(&self) -> bool { self.replay.history.is_empty() }
183}