differential_dataflow/operators/
threshold.rs1use timely::order::TotalOrder;
7use timely::progress::Timestamp;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, VecCollection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::Arranged;
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19pub trait ThresholdTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: ExchangeData, R: ExchangeData+Semigroup> : Sized {
21 fn threshold_semigroup<R2, F>(self, thresh: F) -> VecCollection<'scope, T, K, R2>
23 where
24 R2: Semigroup+'static,
25 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26 ;
27 fn threshold_total<R2: Abelian+'static, F: FnMut(&K,&R)->R2+'static>(self, mut thresh: F) -> VecCollection<'scope, T, K, R2> {
43 self.threshold_semigroup(move |key, new, old| {
44 let mut new = thresh(key, new);
45 if let Some(old) = old {
46 let mut add = thresh(key, old);
47 add.negate();
48 new.plus_equals(&add);
49 }
50 if !new.is_zero() { Some(new) } else { None }
51 })
52 }
53 fn distinct_total(self) -> VecCollection<'scope, T, K, isize> {
73 self.distinct_total_core()
74 }
75
76 fn distinct_total_core<R2: Abelian+From<i8>+'static>(self) -> VecCollection<'scope, T, K, R2> {
82 self.threshold_total(|_,_| R2::from(1i8))
83 }
84
85}
86
87impl<'scope, T, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<'scope, T, K, R> for VecCollection<'scope, T, K, R>
88where
89 T: Timestamp + TotalOrder + Lattice + Ord,
90{
91 fn threshold_semigroup<R2, F>(self, thresh: F) -> VecCollection<'scope, T, K, R2>
92 where
93 R2: Semigroup+'static,
94 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
95 {
96 self.arrange_by_self_named("Arrange: ThresholdTotal")
97 .threshold_semigroup(thresh)
98 }
99}
100
101impl<'scope, K, Tr> ThresholdTotal<'scope, Tr::Time, K, Tr::Diff> for Arranged<'scope, Tr>
102where
103 Tr: for<'a> TraceReader<
104 Key<'a>=&'a K,
105 Val<'a>=&'a (),
106 Time: TotalOrder,
107 Diff : ExchangeData + Semigroup<Tr::DiffGat<'a>>,
108 >+Clone+'static,
109 K: ExchangeData,
110{
111 fn threshold_semigroup<R2, F>(self, mut thresh: F) -> VecCollection<'scope, Tr::Time, K, R2>
112 where
113 R2: Semigroup+'static,
114 F: for<'a> FnMut(Tr::Key<'a>,&Tr::Diff,Option<&Tr::Diff>)->Option<R2>+'static,
115 {
116
117 let mut trace = self.trace.clone();
118
119 self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
120
121 let mut lower_limit = timely::progress::frontier::Antichain::from_elem(Tr::Time::minimum());
123 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(Tr::Time::minimum());
124
125 move |(input, _frontier), output| {
126
127 let mut batch_cursors = Vec::new();
128 let mut batch_storage = Vec::new();
129
130 lower_limit.clear();
132 lower_limit.extend(upper_limit.borrow().iter().cloned());
133
134 let mut cap = None;
135 input.for_each(|capability, batches| {
136 if cap.is_none() { cap = Some(capability.retain(0));
138 }
139 for batch in batches.drain(..) {
140 upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor());
142 batch_storage.push(batch);
143 }
144 });
145
146 if let Some(capability) = cap {
147
148 let mut session = output.session(&capability);
149
150 use crate::trace::cursor::CursorList;
151 let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
152 let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
153
154 while let Some(key) = batch_cursor.get_key(&batch_storage) {
155 let mut count: Option<Tr::Diff> = None;
156
157 trace_cursor.seek_key(&trace_storage, key);
159 if trace_cursor.get_key(&trace_storage) == Some(key) {
160 trace_cursor.map_times(&trace_storage, |_, diff| {
161 count.as_mut().map(|c| c.plus_equals(&diff));
162 if count.is_none() { count = Some(Tr::owned_diff(diff)); }
163 });
164 }
165
166 batch_cursor.map_times(&batch_storage, |time, diff| {
169
170 let difference =
171 match &count {
172 Some(old) => {
173 let mut temp = old.clone();
174 temp.plus_equals(&diff);
175 thresh(key, &temp, Some(old))
176 },
177 None => { thresh(key, &Tr::owned_diff(diff), None) },
178 };
179
180 if let Some(count) = &mut count {
182 count.plus_equals(&diff);
183 }
184 else {
185 count = Some(Tr::owned_diff(diff));
186 }
187
188 if let Some(difference) = difference {
189 if !difference.is_zero() {
190 session.give((key.clone(), Tr::owned_time(time), difference));
191 }
192 }
193 });
194
195 batch_cursor.step_key(&batch_storage);
196 }
197 }
198
199 trace.advance_upper(&mut upper_limit);
201 trace.set_logical_compaction(upper_limit.borrow());
202 trace.set_physical_compaction(upper_limit.borrow());
203 }
204 })
205 .as_collection()
206 }
207}