palimpsest_dataflow/operators/
count.rs1use timely::dataflow::channels::pact::Pipeline;
4use timely::dataflow::operators::Operator;
5use timely::dataflow::*;
6use timely::order::TotalOrder;
7
8use crate::collection::AsCollection;
9use crate::difference::{IsZero, Semigroup};
10use crate::hashable::Hashable;
11use crate::lattice::Lattice;
12use crate::operators::arrange::{ArrangeBySelf, Arranged};
13use crate::trace::{BatchReader, Cursor, TraceReader};
14use crate::{ExchangeData, VecCollection};
15
16pub trait CountTotal<G: Scope<Timestamp: TotalOrder + Lattice + Ord>, K: ExchangeData, R: Semigroup>
18{
19 fn count_total(&self) -> VecCollection<G, (K, R), isize> {
35 self.count_total_core()
36 }
37
38 fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2>;
44}
45
46impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> CountTotal<G, K, R>
47 for VecCollection<G, K, R>
48where
49 G: Scope<Timestamp: TotalOrder + Lattice + Ord>,
50{
51 fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2> {
52 self.arrange_by_self_named("Arrange: CountTotal")
53 .count_total_core()
54 }
55}
56
57impl<G, K, T1> CountTotal<G, K, T1::Diff> for Arranged<G, T1>
58where
59 G: Scope<Timestamp = T1::Time>,
60 T1: for<'a> TraceReader<
61 Key<'a> = &'a K,
62 Val<'a> = &'a (),
63 Time: TotalOrder,
64 Diff: ExchangeData + Semigroup<T1::DiffGat<'a>>,
65 > + Clone
66 + 'static,
67 K: ExchangeData,
68{
69 fn count_total_core<R2: Semigroup + From<i8> + 'static>(
70 &self,
71 ) -> VecCollection<G, (K, T1::Diff), R2> {
72 let mut trace = self.trace.clone();
73
74 self.stream
75 .unary_frontier(Pipeline, "CountTotal", move |_, _| {
76 let mut lower_limit = timely::progress::frontier::Antichain::from_elem(
78 <G::Timestamp as timely::progress::Timestamp>::minimum(),
79 );
80 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(
81 <G::Timestamp as timely::progress::Timestamp>::minimum(),
82 );
83
84 move |(input, _frontier), output| {
85 let mut batch_cursors = Vec::new();
86 let mut batch_storage = Vec::new();
87
88 lower_limit.clear();
90 lower_limit.extend(upper_limit.borrow().iter().cloned());
91
92 let mut cap = None;
93 input.for_each(|capability, batches| {
94 if cap.is_none() {
95 cap = Some(capability.retain());
97 }
98 for batch in batches.drain(..) {
99 upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor());
101 batch_storage.push(batch);
102 }
103 });
104
105 if let Some(capability) = cap {
106 let mut session = output.session(&capability);
107
108 use crate::trace::cursor::CursorList;
109 let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
110 let (mut trace_cursor, trace_storage) =
111 trace.cursor_through(lower_limit.borrow()).unwrap();
112
113 while let Some(key) = batch_cursor.get_key(&batch_storage) {
114 let mut count: Option<T1::Diff> = None;
115
116 trace_cursor.seek_key(&trace_storage, key);
117 if trace_cursor.get_key(&trace_storage) == Some(key) {
118 trace_cursor.map_times(&trace_storage, |_, diff| {
119 count.as_mut().map(|c| c.plus_equals(&diff));
120 if count.is_none() {
121 count = Some(T1::owned_diff(diff));
122 }
123 });
124 }
125
126 batch_cursor.map_times(&batch_storage, |time, diff| {
127 if let Some(count) = count.as_ref() {
128 if !count.is_zero() {
129 session.give((
130 (key.clone(), count.clone()),
131 T1::owned_time(time),
132 R2::from(-1i8),
133 ));
134 }
135 }
136 count.as_mut().map(|c| c.plus_equals(&diff));
137 if count.is_none() {
138 count = Some(T1::owned_diff(diff));
139 }
140 if let Some(count) = count.as_ref() {
141 if !count.is_zero() {
142 session.give((
143 (key.clone(), count.clone()),
144 T1::owned_time(time),
145 R2::from(1i8),
146 ));
147 }
148 }
149 });
150
151 batch_cursor.step_key(&batch_storage);
152 }
153 }
154
155 trace.advance_upper(&mut upper_limit);
157 trace.set_logical_compaction(upper_limit.borrow());
158 trace.set_physical_compaction(upper_limit.borrow());
159 }
160 })
161 .as_collection()
162 }
163}