Skip to main content

palimpsest_dataflow/operators/
threshold.rs

1//! Reduce the collection to one occurrence of each distinct element.
2//!
3//! The `distinct_total` and `distinct_total_u` operators are optimizations of the more general
4//! `distinct` and `distinct_u` operators for the case in which time is totally ordered.
5
6use timely::dataflow::channels::pact::Pipeline;
7use timely::dataflow::operators::Operator;
8use timely::dataflow::*;
9use timely::order::TotalOrder;
10
11use crate::collection::AsCollection;
12use crate::difference::{Abelian, Semigroup};
13use crate::hashable::Hashable;
14use crate::lattice::Lattice;
15use crate::operators::arrange::{ArrangeBySelf, Arranged};
16use crate::trace::{BatchReader, Cursor, TraceReader};
17use crate::{ExchangeData, VecCollection};
18
19/// Extension trait for the `distinct` differential dataflow method.
20pub trait ThresholdTotal<
21    G: Scope<Timestamp: TotalOrder + Lattice + Ord>,
22    K: ExchangeData,
23    R: ExchangeData + Semigroup,
24>
25{
26    /// Reduces the collection to one occurrence of each distinct element.
27    fn threshold_semigroup<R2, F>(&self, thresh: F) -> VecCollection<G, K, R2>
28    where
29        R2: Semigroup + 'static,
30        F: FnMut(&K, &R, Option<&R>) -> Option<R2> + 'static;
31    /// Reduces the collection to one occurrence of each distinct element.
32    ///
33    /// # Examples
34    ///
35    /// ```
36    /// use palimpsest_dataflow::input::Input;
37    /// use palimpsest_dataflow::operators::ThresholdTotal;
38    ///
39    /// ::timely::example(|scope| {
40    ///     // report the number of occurrences of each key
41    ///     scope.new_collection_from(1 .. 10).1
42    ///          .map(|x| x / 3)
43    ///          .threshold_total(|_,c| c % 2);
44    /// });
45    /// ```
46    fn threshold_total<R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static>(
47        &self,
48        mut thresh: F,
49    ) -> VecCollection<G, K, R2> {
50        self.threshold_semigroup(move |key, new, old| {
51            let mut new = thresh(key, new);
52            if let Some(old) = old {
53                let mut add = thresh(key, old);
54                add.negate();
55                new.plus_equals(&add);
56            }
57            if !new.is_zero() {
58                Some(new)
59            } else {
60                None
61            }
62        })
63    }
64    /// Reduces the collection to one occurrence of each distinct element.
65    ///
66    /// This reduction only tests whether the weight associated with a record is non-zero, and otherwise
67    /// ignores its specific value. To take more general actions based on the accumulated weight, consider
68    /// the `threshold` method.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use palimpsest_dataflow::input::Input;
74    /// use palimpsest_dataflow::operators::ThresholdTotal;
75    ///
76    /// ::timely::example(|scope| {
77    ///     // report the number of occurrences of each key
78    ///     scope.new_collection_from(1 .. 10).1
79    ///          .map(|x| x / 3)
80    ///          .distinct_total();
81    /// });
82    /// ```
83    fn distinct_total(&self) -> VecCollection<G, K, isize> {
84        self.distinct_total_core()
85    }
86
87    /// Distinct for general integer differences.
88    ///
89    /// This method allows `distinct` to produce collections whose difference
90    /// type is something other than an `isize` integer, for example perhaps an
91    /// `i32`.
92    fn distinct_total_core<R2: Abelian + From<i8> + 'static>(&self) -> VecCollection<G, K, R2> {
93        self.threshold_total(|_, _| R2::from(1i8))
94    }
95}
96
97impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ThresholdTotal<G, K, R>
98    for VecCollection<G, K, R>
99where
100    G: Scope<Timestamp: TotalOrder + Lattice + Ord>,
101{
102    fn threshold_semigroup<R2, F>(&self, thresh: F) -> VecCollection<G, K, R2>
103    where
104        R2: Semigroup + 'static,
105        F: FnMut(&K, &R, Option<&R>) -> Option<R2> + 'static,
106    {
107        self.arrange_by_self_named("Arrange: ThresholdTotal")
108            .threshold_semigroup(thresh)
109    }
110}
111
112impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
113where
114    G: Scope<Timestamp = T1::Time>,
115    T1: for<'a> TraceReader<
116            Key<'a> = &'a K,
117            Val<'a> = &'a (),
118            Time: TotalOrder,
119            Diff: ExchangeData + Semigroup<T1::DiffGat<'a>>,
120        > + Clone
121        + 'static,
122    K: ExchangeData,
123{
124    fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> VecCollection<G, K, R2>
125    where
126        R2: Semigroup + 'static,
127        F: for<'a> FnMut(T1::Key<'a>, &T1::Diff, Option<&T1::Diff>) -> Option<R2> + 'static,
128    {
129        let mut trace = self.trace.clone();
130
131        self.stream
132            .unary_frontier(Pipeline, "ThresholdTotal", move |_, _| {
133                // tracks the lower and upper limit of received batches.
134                let mut lower_limit = timely::progress::frontier::Antichain::from_elem(
135                    <G::Timestamp as timely::progress::Timestamp>::minimum(),
136                );
137                let mut upper_limit = timely::progress::frontier::Antichain::from_elem(
138                    <G::Timestamp as timely::progress::Timestamp>::minimum(),
139                );
140
141                move |(input, _frontier), output| {
142                    let mut batch_cursors = Vec::new();
143                    let mut batch_storage = Vec::new();
144
145                    // Downgrde previous upper limit to be current lower limit.
146                    lower_limit.clear();
147                    lower_limit.extend(upper_limit.borrow().iter().cloned());
148
149                    let mut cap = None;
150                    input.for_each(|capability, batches| {
151                        if cap.is_none() {
152                            // NB: Assumes batches are in-order
153                            cap = Some(capability.retain());
154                        }
155                        for batch in batches.drain(..) {
156                            upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
157                            batch_cursors.push(batch.cursor());
158                            batch_storage.push(batch);
159                        }
160                    });
161
162                    if let Some(capability) = cap {
163                        let mut session = output.session(&capability);
164
165                        use crate::trace::cursor::CursorList;
166                        let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
167                        let (mut trace_cursor, trace_storage) =
168                            trace.cursor_through(lower_limit.borrow()).unwrap();
169
170                        while let Some(key) = batch_cursor.get_key(&batch_storage) {
171                            let mut count: Option<T1::Diff> = None;
172
173                            // Compute the multiplicity of this key before the current batch.
174                            trace_cursor.seek_key(&trace_storage, key);
175                            if trace_cursor.get_key(&trace_storage) == Some(key) {
176                                trace_cursor.map_times(&trace_storage, |_, diff| {
177                                    count.as_mut().map(|c| c.plus_equals(&diff));
178                                    if count.is_none() {
179                                        count = Some(T1::owned_diff(diff));
180                                    }
181                                });
182                            }
183
184                            // Apply `thresh` both before and after `diff` is applied to `count`.
185                            // If the result is non-zero, send it along.
186                            batch_cursor.map_times(&batch_storage, |time, diff| {
187                                let difference = match &count {
188                                    Some(old) => {
189                                        let mut temp = old.clone();
190                                        temp.plus_equals(&diff);
191                                        thresh(key, &temp, Some(old))
192                                    }
193                                    None => thresh(key, &T1::owned_diff(diff), None),
194                                };
195
196                                // Either add or assign `diff` to `count`.
197                                if let Some(count) = &mut count {
198                                    count.plus_equals(&diff);
199                                } else {
200                                    count = Some(T1::owned_diff(diff));
201                                }
202
203                                if let Some(difference) = difference {
204                                    if !difference.is_zero() {
205                                        session.give((
206                                            key.clone(),
207                                            T1::owned_time(time),
208                                            difference,
209                                        ));
210                                    }
211                                }
212                            });
213
214                            batch_cursor.step_key(&batch_storage);
215                        }
216                    }
217
218                    // tidy up the shared input trace.
219                    trace.advance_upper(&mut upper_limit);
220                    trace.set_logical_compaction(upper_limit.borrow());
221                    trace.set_physical_compaction(upper_limit.borrow());
222                }
223            })
224            .as_collection()
225    }
226}