Skip to main content

palimpsest_dataflow/operators/
count.rs

1//! Count the number of occurrences of each element.
2
3use timely::dataflow::channels::pact::Pipeline;
4use timely::dataflow::operators::Operator;
5use timely::dataflow::*;
6use timely::order::TotalOrder;
7
8use crate::collection::AsCollection;
9use crate::difference::{IsZero, Semigroup};
10use crate::hashable::Hashable;
11use crate::lattice::Lattice;
12use crate::operators::arrange::{ArrangeBySelf, Arranged};
13use crate::trace::{BatchReader, Cursor, TraceReader};
14use crate::{ExchangeData, VecCollection};
15
16/// Extension trait for the `count` differential dataflow method.
17pub trait CountTotal<G: Scope<Timestamp: TotalOrder + Lattice + Ord>, K: ExchangeData, R: Semigroup>
18{
19    /// Counts the number of occurrences of each element.
20    ///
21    /// # Examples
22    ///
23    /// ```
24    /// use palimpsest_dataflow::input::Input;
25    /// use palimpsest_dataflow::operators::CountTotal;
26    ///
27    /// ::timely::example(|scope| {
28    ///     // report the number of occurrences of each key
29    ///     scope.new_collection_from(1 .. 10).1
30    ///          .map(|x| x / 3)
31    ///          .count_total();
32    /// });
33    /// ```
34    fn count_total(&self) -> VecCollection<G, (K, R), isize> {
35        self.count_total_core()
36    }
37
38    /// Count for general integer differences.
39    ///
40    /// This method allows `count_total` to produce collections whose difference
41    /// type is something other than an `isize` integer, for example perhaps an
42    /// `i32`.
43    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2>;
44}
45
46impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> CountTotal<G, K, R>
47    for VecCollection<G, K, R>
48where
49    G: Scope<Timestamp: TotalOrder + Lattice + Ord>,
50{
51    fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2> {
52        self.arrange_by_self_named("Arrange: CountTotal")
53            .count_total_core()
54    }
55}
56
57impl<G, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
58where
59    G: Scope<Timestamp = T1::Time>,
60    T1: for<'a> TraceReader<
61            Key<'a> = &'a K,
62            Val<'a> = &'a (),
63            Time: TotalOrder,
64            Diff: ExchangeData + Semigroup<T1::DiffGat<'a>>,
65        > + Clone
66        + 'static,
67    K: ExchangeData,
68{
69    fn count_total_core<R2: Semigroup + From<i8> + 'static>(
70        &self,
71    ) -> VecCollection<G, (K, T1::Diff), R2> {
72        let mut trace = self.trace.clone();
73
74        self.stream
75            .unary_frontier(Pipeline, "CountTotal", move |_, _| {
76                // tracks the lower and upper limit of received batches.
77                let mut lower_limit = timely::progress::frontier::Antichain::from_elem(
78                    <G::Timestamp as timely::progress::Timestamp>::minimum(),
79                );
80                let mut upper_limit = timely::progress::frontier::Antichain::from_elem(
81                    <G::Timestamp as timely::progress::Timestamp>::minimum(),
82                );
83
84                move |(input, _frontier), output| {
85                    let mut batch_cursors = Vec::new();
86                    let mut batch_storage = Vec::new();
87
88                    // Downgrade previous upper limit to be current lower limit.
89                    lower_limit.clear();
90                    lower_limit.extend(upper_limit.borrow().iter().cloned());
91
92                    let mut cap = None;
93                    input.for_each(|capability, batches| {
94                        if cap.is_none() {
95                            // NB: Assumes batches are in-order
96                            cap = Some(capability.retain());
97                        }
98                        for batch in batches.drain(..) {
99                            upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
100                            batch_cursors.push(batch.cursor());
101                            batch_storage.push(batch);
102                        }
103                    });
104
105                    if let Some(capability) = cap {
106                        let mut session = output.session(&capability);
107
108                        use crate::trace::cursor::CursorList;
109                        let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
110                        let (mut trace_cursor, trace_storage) =
111                            trace.cursor_through(lower_limit.borrow()).unwrap();
112
113                        while let Some(key) = batch_cursor.get_key(&batch_storage) {
114                            let mut count: Option<T1::Diff> = None;
115
116                            trace_cursor.seek_key(&trace_storage, key);
117                            if trace_cursor.get_key(&trace_storage) == Some(key) {
118                                trace_cursor.map_times(&trace_storage, |_, diff| {
119                                    count.as_mut().map(|c| c.plus_equals(&diff));
120                                    if count.is_none() {
121                                        count = Some(T1::owned_diff(diff));
122                                    }
123                                });
124                            }
125
126                            batch_cursor.map_times(&batch_storage, |time, diff| {
127                                if let Some(count) = count.as_ref() {
128                                    if !count.is_zero() {
129                                        session.give((
130                                            (key.clone(), count.clone()),
131                                            T1::owned_time(time),
132                                            R2::from(-1i8),
133                                        ));
134                                    }
135                                }
136                                count.as_mut().map(|c| c.plus_equals(&diff));
137                                if count.is_none() {
138                                    count = Some(T1::owned_diff(diff));
139                                }
140                                if let Some(count) = count.as_ref() {
141                                    if !count.is_zero() {
142                                        session.give((
143                                            (key.clone(), count.clone()),
144                                            T1::owned_time(time),
145                                            R2::from(1i8),
146                                        ));
147                                    }
148                                }
149                            });
150
151                            batch_cursor.step_key(&batch_storage);
152                        }
153                    }
154
155                    // tidy up the shared input trace.
156                    trace.advance_upper(&mut upper_limit);
157                    trace.set_logical_compaction(upper_limit.borrow());
158                    trace.set_physical_compaction(upper_limit.borrow());
159                }
160            })
161            .as_collection()
162    }
163}