differential_dataflow/operators/
threshold.rs1use timely::order::TotalOrder;
7use timely::dataflow::*;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, Collection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::{Arranged, ArrangeBySelf};
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19pub trait ThresholdTotal<G: Scope<Timestamp: TotalOrder+Lattice+Ord>, K: ExchangeData, R: ExchangeData+Semigroup> {
21 fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
23 where
24 R2: Semigroup+'static,
25 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26 ;
27 fn threshold_total<R2: Abelian+'static, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
43 self.threshold_semigroup(move |key, new, old| {
44 let mut new = thresh(key, new);
45 if let Some(old) = old {
46 let mut add = thresh(key, old);
47 add.negate();
48 new.plus_equals(&add);
49 }
50 if !new.is_zero() { Some(new) } else { None }
51 })
52 }
53 fn distinct_total(&self) -> Collection<G, K, isize> {
73 self.distinct_total_core()
74 }
75
76 fn distinct_total_core<R2: Abelian+From<i8>+'static>(&self) -> Collection<G, K, R2> {
82 self.threshold_total(|_,_| R2::from(1i8))
83 }
84
85}
86
87impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
88where
89 G: Scope<Timestamp: TotalOrder+Lattice+Ord>,
90{
91 fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
92 where
93 R2: Semigroup+'static,
94 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
95 {
96 self.arrange_by_self_named("Arrange: ThresholdTotal")
97 .threshold_semigroup(thresh)
98 }
99}
100
101impl<G, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
102where
103 G: Scope<Timestamp=T1::Time>,
104 T1: for<'a> TraceReader<
105 Key<'a>=&'a K,
106 Val<'a>=&'a (),
107 Time: TotalOrder,
108 Diff : ExchangeData + Semigroup<T1::DiffGat<'a>>,
109 >+Clone+'static,
110 K: ExchangeData,
111{
112 fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
113 where
114 R2: Semigroup+'static,
115 F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
116 {
117
118 let mut trace = self.trace.clone();
119
120 self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
121
122 let mut lower_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
124 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
125
126 move |input, output| {
127
128 let mut batch_cursors = Vec::new();
129 let mut batch_storage = Vec::new();
130
131 lower_limit.clear();
133 lower_limit.extend(upper_limit.borrow().iter().cloned());
134
135 let mut cap = None;
136 input.for_each(|capability, batches| {
137 if cap.is_none() { cap = Some(capability.retain());
139 }
140 for batch in batches.drain(..) {
141 upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor());
143 batch_storage.push(batch);
144 }
145 });
146
147 if let Some(capability) = cap {
148
149 let mut session = output.session(&capability);
150
151 use crate::trace::cursor::CursorList;
152 let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
153 let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
154
155 while let Some(key) = batch_cursor.get_key(&batch_storage) {
156 let mut count: Option<T1::Diff> = None;
157
158 trace_cursor.seek_key(&trace_storage, key);
160 if trace_cursor.get_key(&trace_storage) == Some(key) {
161 trace_cursor.map_times(&trace_storage, |_, diff| {
162 count.as_mut().map(|c| c.plus_equals(&diff));
163 if count.is_none() { count = Some(T1::owned_diff(diff)); }
164 });
165 }
166
167 batch_cursor.map_times(&batch_storage, |time, diff| {
170
171 let difference =
172 match &count {
173 Some(old) => {
174 let mut temp = old.clone();
175 temp.plus_equals(&diff);
176 thresh(key, &temp, Some(old))
177 },
178 None => { thresh(key, &T1::owned_diff(diff), None) },
179 };
180
181 if let Some(count) = &mut count {
183 count.plus_equals(&diff);
184 }
185 else {
186 count = Some(T1::owned_diff(diff));
187 }
188
189 if let Some(difference) = difference {
190 if !difference.is_zero() {
191 session.give((key.clone(), T1::owned_time(time), difference));
192 }
193 }
194 });
195
196 batch_cursor.step_key(&batch_storage);
197 }
198 }
199
200 trace.advance_upper(&mut upper_limit);
202 trace.set_logical_compaction(upper_limit.borrow());
203 trace.set_physical_compaction(upper_limit.borrow());
204 }
205 })
206 .as_collection()
207 }
208}