differential_dataflow/operators/
threshold.rs1use timely::order::TotalOrder;
7use timely::dataflow::*;
8use timely::dataflow::operators::Operator;
9use timely::dataflow::channels::pact::Pipeline;
10
11use crate::lattice::Lattice;
12use crate::{ExchangeData, Collection};
13use crate::difference::{Semigroup, Abelian};
14use crate::hashable::Hashable;
15use crate::collection::AsCollection;
16use crate::operators::arrange::{Arranged, ArrangeBySelf};
17use crate::trace::{BatchReader, Cursor, TraceReader};
18
19pub trait ThresholdTotal<G: Scope, K: ExchangeData, R: ExchangeData+Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
21 fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
23 where
24 R2: Semigroup,
25 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
26 ;
27 fn threshold_total<R2: Abelian, F: FnMut(&K,&R)->R2+'static>(&self, mut thresh: F) -> Collection<G, K, R2> {
43 self.threshold_semigroup(move |key, new, old| {
44 let mut new = thresh(key, new);
45 if let Some(old) = old { new.plus_equals(&thresh(key, old).negate()); }
46 if !new.is_zero() { Some(new) } else { None }
47 })
48 }
49 fn distinct_total(&self) -> Collection<G, K, isize> {
69 self.distinct_total_core()
70 }
71
72 fn distinct_total_core<R2: Abelian+From<i8>>(&self) -> Collection<G, K, R2> {
78 self.threshold_total(|_,_| R2::from(1i8))
79 }
80
81}
82
83impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> ThresholdTotal<G, K, R> for Collection<G, K, R>
84where G::Timestamp: TotalOrder+Lattice+Ord {
85 fn threshold_semigroup<R2, F>(&self, thresh: F) -> Collection<G, K, R2>
86 where
87 R2: Semigroup,
88 F: FnMut(&K,&R,Option<&R>)->Option<R2>+'static,
89 {
90 self.arrange_by_self_named("Arrange: ThresholdTotal")
91 .threshold_semigroup(thresh)
92 }
93}
94
95impl<G: Scope, K, T1> ThresholdTotal<G, K, T1::Diff> for Arranged<G, T1>
96where
97 G::Timestamp: TotalOrder+Lattice+Ord,
98 T1: for<'a> TraceReader<Key<'a>=&'a K, Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
99 K: ExchangeData,
100 T1::Diff: ExchangeData+Semigroup,
101{
102 fn threshold_semigroup<R2, F>(&self, mut thresh: F) -> Collection<G, K, R2>
103 where
104 R2: Semigroup,
105 F: for<'a> FnMut(T1::Key<'a>,&T1::Diff,Option<&T1::Diff>)->Option<R2>+'static,
106 {
107
108 let mut trace = self.trace.clone();
109 let mut buffer = Vec::new();
110
111 self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {
112
113 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
115
116 move |input, output| {
117
118 input.for_each(|capability, batches| {
119 batches.swap(&mut buffer);
120 let mut session = output.session(&capability);
121 for batch in buffer.drain(..) {
122
123 let mut batch_cursor = batch.cursor();
124 let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
125
126 upper_limit.clone_from(batch.upper());
127
128 while let Some(key) = batch_cursor.get_key(&batch) {
129 let mut count: Option<T1::Diff> = None;
130
131 trace_cursor.seek_key(&trace_storage, key);
133 if trace_cursor.get_key(&trace_storage) == Some(key) {
134 trace_cursor.map_times(&trace_storage, |_, diff| {
135 count.as_mut().map(|c| c.plus_equals(diff));
136 if count.is_none() { count = Some(diff.clone()); }
137 });
138 }
139
140 batch_cursor.map_times(&batch, |time, diff| {
143
144 let difference =
145 match &count {
146 Some(old) => {
147 let mut temp = old.clone();
148 temp.plus_equals(diff);
149 thresh(key, &temp, Some(old))
150 },
151 None => { thresh(key, diff, None) },
152 };
153
154 if let Some(count) = &mut count {
156 count.plus_equals(diff);
157 }
158 else {
159 count = Some(diff.clone());
160 }
161
162 if let Some(difference) = difference {
163 if !difference.is_zero() {
164 session.give((key.clone(), time.clone(), difference));
165 }
166 }
167 });
168
169 batch_cursor.step_key(&batch);
170 }
171 }
172 });
173
174 trace.advance_upper(&mut upper_limit);
176 trace.set_logical_compaction(upper_limit.borrow());
177 trace.set_physical_compaction(upper_limit.borrow());
178 }
179 })
180 .as_collection()
181 }
182}