differential_dataflow/operators/
threshold.rs

1//! Reduce the collection to one occurrence of each distinct element.
2//!
3//! The `distinct_total` and `distinct_total_u` operators are optimizations of the more general
4//! `distinct` and `distinct_u` operators for the case in which time is totally ordered.
5
6use timely::order::TotalOrder;
7use timely::dataflow::*;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, Collection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::{Arranged, ArrangeBySelf};
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19/// Extension trait for the `distinct` differential dataflow method.
20pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
21    /// Reduces the collection to one occurrence of each distinct element.
22    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
23    where
24        R2: Semigroup,
25        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26        ;
27    /// Reduces the collection to one occurrence of each distinct element.
28    ///
29    /// # Examples
30    ///
31    /// ```
32    /// use differential_dataflow::input::Input;
33    /// use differential_dataflow::operators::ThresholdTotal;
34    ///
35    /// ::timely::example(|scope| {
36    ///     // report the number of occurrences of each key
37    ///     scope.new_collection_from(1 .. 10).1
38    ///          .map(|x| x / 3)
39    ///          .threshold_total(|_,c| c % 2);
40    /// });
41    /// ```
42    fn threshold_total<R2: Abelian, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
43        self.threshold_semigroup(move |key, new, old| {
44            let mut new = thresh(key, new);
45            if let Some(old) = old { new.plus_equals(&thresh(key, old).negate()); }
46            if !new.is_zero() { Some(new) } else { None }
47        })
48    }
49    /// Reduces the collection to one occurrence of each distinct element.
50    ///
51    /// This reduction only tests whether the weight associated with a record is non-zero, and otherwise
52    /// ignores its specific value. To take more general actions based on the accumulated weight, consider
53    /// the `threshold` method.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use differential_dataflow::input::Input;
59    /// use differential_dataflow::operators::ThresholdTotal;
60    ///
61    /// ::timely::example(|scope| {
62    ///     // report the number of occurrences of each key
63    ///     scope.new_collection_from(1 .. 10).1
64    ///          .map(|x| x / 3)
65    ///          .distinct_total();
66    /// });
67    /// ```
68    fn distinct_total(&self) -> Collection<G, K, isize> {
69        self.distinct_total_core()
70    }
71
72    /// Distinct for general integer differences.
73    ///
74    /// This method allows `distinct` to produce collections whose difference
75    /// type is something other than an `isize` integer, for example perhaps an
76    /// `i32`.
77    fn distinct_total_core<R2: Abelian+From<i8>>(&self) -> Collection<G, K, R2> {
78        self.threshold_total(|_,_| R2::from(1i8))
79    }
80
81}
82
83impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
84where G::Timestamp: TotalOrder+Lattice+Ord {
85    fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
86    where
87        R2: Semigroup,
88        F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
89    {
90        self.arrange_by_self_named("Arrange: ThresholdTotal")
91            .threshold_semigroup(thresh)
92    }
93}
94
95impl<G: Scope, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
96where
97    G::Timestamp: TotalOrder+Lattice+Ord,
98    T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
99    K: ExchangeData,
100    T1::Diff: ExchangeData+Semigroup,
101{
102    fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
103    where
104        R2: Semigroup,
105        F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
106    {
107
108        let mut trace = self.trace.clone();
109        let mut buffer = Vec::new();
110
111        self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
112
113            // tracks the upper limit of known-complete timestamps.
114            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
115
116            move |input, output| {
117
118                input.for_each(|capability, batches| {
119                    batches.swap(&mut buffer);
120                    let mut session = output.session(&capability);
121                    for batch in buffer.drain(..) {
122
123                        let mut batch_cursor = batch.cursor();
124                        let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
125
126                        upper_limit.clone_from(batch.upper());
127
128                        while let Some(key) = batch_cursor.get_key(&batch) {
129                            let mut count: Option<T1::Diff> = None;
130
131                            // Compute the multiplicity of this key before the current batch.
132                            trace_cursor.seek_key(&trace_storage, key);
133                            if trace_cursor.get_key(&trace_storage) == Some(key) {
134                                trace_cursor.map_times(&trace_storage, |_, diff| {
135                                    count.as_mut().map(|c| c.plus_equals(diff));
136                                    if count.is_none() { count = Some(diff.clone()); }
137                                });
138                            }
139
140                            // Apply `thresh` both before and after `diff` is applied to `count`.
141                            // If the result is non-zero, send it along.
142                            batch_cursor.map_times(&batch, |time, diff| {
143
144                                let difference =
145                                match &count {
146                                    Some(old) => {
147                                        let mut temp = old.clone();
148                                        temp.plus_equals(diff);
149                                        thresh(key, &temp, Some(old))
150                                    },
151                                    None => { thresh(key, diff, None) },
152                                };
153
154                                // Either add or assign `diff` to `count`.
155                                if let Some(count) = &mut count {
156                                    count.plus_equals(diff);
157                                }
158                                else {
159                                    count = Some(diff.clone());
160                                }
161
162                                if let Some(difference) = difference {
163                                    if !difference.is_zero() {
164                                        session.give((key.clone(), time.clone(), difference));
165                                    }
166                                }
167                            });
168
169                            batch_cursor.step_key(&batch);
170                        }
171                    }
172                });
173
174                // tidy up the shared input trace.
175                trace.advance_upper(&mut upper_limit);
176                trace.set_logical_compaction(upper_limit.borrow());
177                trace.set_physical_compaction(upper_limit.borrow());
178            }
179        })
180        .as_collection()
181    }
182}