palimpsest_dataflow/operators/
threshold.rs1use timely::dataflow::channels::pact::Pipeline;
7use timely::dataflow::operators::Operator;
8use timely::dataflow::*;
9use timely::order::TotalOrder;
10
11use crate::collection::AsCollection;
12use crate::difference::{Abelian, Semigroup};
13use crate::hashable::Hashable;
14use crate::lattice::Lattice;
15use crate::operators::arrange::{ArrangeBySelf, Arranged};
16use crate::trace::{BatchReader, Cursor, TraceReader};
17use crate::{ExchangeData, VecCollection};
18
19pub trait ThresholdTotal<
21 G: Scope<Timestamp: TotalOrder + Lattice + Ord>,
22 K: ExchangeData,
23 R: ExchangeData + Semigroup,
24>
25{
26 fn threshold_semigroup<R2, F>(&self, thresh: F) -> VecCollection<G, K, R2>
28 where
29 R2: Semigroup + 'static,
30 F: FnMut(&K, &R, Option<&R>) -> Option<R2> + 'static;
31 fn threshold_total<R2: Abelian + 'static, F: FnMut(&K, &R) -> R2 + 'static>(
47 &self,
48 mut thresh: F,
49 ) -> VecCollection<G, K, R2> {
50 self.threshold_semigroup(move |key, new, old| {
51 let mut new = thresh(key, new);
52 if let Some(old) = old {
53 let mut add = thresh(key, old);
54 add.negate();
55 new.plus_equals(&add);
56 }
57 if !new.is_zero() {
58 Some(new)
59 } else {
60 None
61 }
62 })
63 }
64 fn distinct_total(&self) -> VecCollection<G, K, isize> {
84 self.distinct_total_core()
85 }
86
87 fn distinct_total_core<R2: Abelian + From<i8> + 'static>(&self) -> VecCollection<G, K, R2> {
93 self.threshold_total(|_, _| R2::from(1i8))
94 }
95}
96
97impl<G: Scope, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> ThresholdTotal<G, K, R>
98 for VecCollection<G, K, R>
99where
100 G: Scope<Timestamp: TotalOrder + Lattice + Ord>,
101{
102 fn threshold_semigroup<R2, F>(&self, thresh: F) -> VecCollection<G, K, R2>
103 where
104 R2: Semigroup + 'static,
105 F: FnMut(&K, &R, Option<&R>) -> Option<R2> + 'static,
106 {
107 self.arrange_by_self_named("Arrange: ThresholdTotal")
108 .threshold_semigroup(thresh)
109 }
110}
111
112impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
113where
114 G: Scope<Timestamp = T1::Time>,
115 T1: for<'a> TraceReader<
116 Key<'a> = &'a K,
117 Val<'a> = &'a (),
118 Time: TotalOrder,
119 Diff: ExchangeData + Semigroup<T1::DiffGat<'a>>,
120 > + Clone
121 + 'static,
122 K: ExchangeData,
123{
124 fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> VecCollection<G, K, R2>
125 where
126 R2: Semigroup + 'static,
127 F: for<'a> FnMut(T1::Key<'a>, &T1::Diff, Option<&T1::Diff>) -> Option<R2> + 'static,
128 {
129 let mut trace = self.trace.clone();
130
131 self.stream
132 .unary_frontier(Pipeline, "ThresholdTotal", move |_, _| {
133 let mut lower_limit = timely::progress::frontier::Antichain::from_elem(
135 <G::Timestamp as timely::progress::Timestamp>::minimum(),
136 );
137 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(
138 <G::Timestamp as timely::progress::Timestamp>::minimum(),
139 );
140
141 move |(input, _frontier), output| {
142 let mut batch_cursors = Vec::new();
143 let mut batch_storage = Vec::new();
144
145 lower_limit.clear();
147 lower_limit.extend(upper_limit.borrow().iter().cloned());
148
149 let mut cap = None;
150 input.for_each(|capability, batches| {
151 if cap.is_none() {
152 cap = Some(capability.retain());
154 }
155 for batch in batches.drain(..) {
156 upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor());
158 batch_storage.push(batch);
159 }
160 });
161
162 if let Some(capability) = cap {
163 let mut session = output.session(&capability);
164
165 use crate::trace::cursor::CursorList;
166 let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
167 let (mut trace_cursor, trace_storage) =
168 trace.cursor_through(lower_limit.borrow()).unwrap();
169
170 while let Some(key) = batch_cursor.get_key(&batch_storage) {
171 let mut count: Option<T1::Diff> = None;
172
173 trace_cursor.seek_key(&trace_storage, key);
175 if trace_cursor.get_key(&trace_storage) == Some(key) {
176 trace_cursor.map_times(&trace_storage, |_, diff| {
177 count.as_mut().map(|c| c.plus_equals(&diff));
178 if count.is_none() {
179 count = Some(T1::owned_diff(diff));
180 }
181 });
182 }
183
184 batch_cursor.map_times(&batch_storage, |time, diff| {
187 let difference = match &count {
188 Some(old) => {
189 let mut temp = old.clone();
190 temp.plus_equals(&diff);
191 thresh(key, &temp, Some(old))
192 }
193 None => thresh(key, &T1::owned_diff(diff), None),
194 };
195
196 if let Some(count) = &mut count {
198 count.plus_equals(&diff);
199 } else {
200 count = Some(T1::owned_diff(diff));
201 }
202
203 if let Some(difference) = difference {
204 if !difference.is_zero() {
205 session.give((
206 key.clone(),
207 T1::owned_time(time),
208 difference,
209 ));
210 }
211 }
212 });
213
214 batch_cursor.step_key(&batch_storage);
215 }
216 }
217
218 trace.advance_upper(&mut upper_limit);
220 trace.set_logical_compaction(upper_limit.borrow());
221 trace.set_physical_compaction(upper_limit.borrow());
222 }
223 })
224 .as_collection()
225 }
226}