differential_dataflow/operators/
count.rs

1//! Count the number of occurrences of each element.
2
3use timely::order::TotalOrder;
4use timely::dataflow::*;
5use timely::dataflow::operators::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8use crate::lattice::Lattice;
9use crate::{ExchangeData, Collection};
10use crate::difference::Semigroup;
11use crate::hashable::Hashable;
12use crate::collection::AsCollection;
13use crate::operators::arrange::{Arranged, ArrangeBySelf};
14use crate::trace::{BatchReader, Cursor, TraceReader};
15
16/// Extension trait for the `count` differential dataflow method.
17pub trait CountTotal<G: Scope, K: ExchangeData, R: Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
18    /// Counts the number of occurrences of each element.
19    ///
20    /// # Examples
21    ///
22    /// ```
23    /// use differential_dataflow::input::Input;
24    /// use differential_dataflow::operators::CountTotal;
25    ///
26    /// ::timely::example(|scope| {
27    ///     // report the number of occurrences of each key
28    ///     scope.new_collection_from(1 .. 10).1
29    ///          .map(|x| x / 3)
30    ///          .count_total();
31    /// });
32    /// ```
33    fn count_total(&self) -> Collection<G, (K, R), isize> {
34        self.count_total_core()
35    }
36
37    /// Count for general integer differences.
38    ///
39    /// This method allows `count_total` to produce collections whose difference
40    /// type is something other than an `isize` integer, for example perhaps an
41    /// `i32`.
42    fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, R), R2>;
43}
44
45impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
46where G::Timestamp: TotalOrder+Lattice+Ord {
47    fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, R), R2> {
48        self.arrange_by_self_named("Arrange: CountTotal")
49            .count_total_core()
50    }
51}
52
53impl<G: Scope, T1> CountTotal<G, T1::KeyOwned, T1::Diff> for Arranged<G, T1>
54where
55    G::Timestamp: TotalOrder+Lattice+Ord,
56    T1: for<'a> TraceReader<Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
57    T1::KeyOwned: ExchangeData,
58    T1::Diff: ExchangeData+Semigroup,
59{
60    fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::KeyOwned, T1::Diff), R2> {
61
62        let mut trace = self.trace.clone();
63        let mut buffer = Vec::new();
64
65        self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
66
67            // tracks the upper limit of known-complete timestamps.
68            let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
69
70            move |input, output| {
71
72                use crate::trace::cursor::MyTrait;
73                input.for_each(|capability, batches| {
74                    batches.swap(&mut buffer);
75                    let mut session = output.session(&capability);
76                    for batch in buffer.drain(..) {
77                        let mut batch_cursor = batch.cursor();
78                        let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
79                        upper_limit.clone_from(batch.upper());
80
81                        while let Some(key) = batch_cursor.get_key(&batch) {
82                            let mut count: Option<T1::Diff> = None;
83
84                            trace_cursor.seek_key(&trace_storage, key);
85                            if trace_cursor.get_key(&trace_storage) == Some(key) {
86                                trace_cursor.map_times(&trace_storage, |_, diff| {
87                                    count.as_mut().map(|c| c.plus_equals(diff));
88                                    if count.is_none() { count = Some(diff.clone()); }
89                                });
90                            }
91
92                            batch_cursor.map_times(&batch, |time, diff| {
93
94                                if let Some(count) = count.as_ref() {
95                                    if !count.is_zero() {
96                                        session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8)));
97                                    }
98                                }
99                                count.as_mut().map(|c| c.plus_equals(diff));
100                                if count.is_none() { count = Some(diff.clone()); }
101                                if let Some(count) = count.as_ref() {
102                                    if !count.is_zero() {
103                                        session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8)));
104                                    }
105                                }
106                            });
107
108                            batch_cursor.step_key(&batch);
109                        }
110                    }
111                });
112
113                // tidy up the shared input trace.
114                trace.advance_upper(&mut upper_limit);
115                trace.set_logical_compaction(upper_limit.borrow());
116                trace.set_physical_compaction(upper_limit.borrow());
117            }
118        })
119        .as_collection()
120    }
121}