differential_dataflow/operators/
count.rs1use timely::order::TotalOrder;
4use timely::dataflow::*;
5use timely::dataflow::operators::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8use crate::lattice::Lattice;
9use crate::{ExchangeData, Collection};
10use crate::difference::Semigroup;
11use crate::hashable::Hashable;
12use crate::collection::AsCollection;
13use crate::operators::arrange::{Arranged, ArrangeBySelf};
14use crate::trace::{BatchReader, Cursor, TraceReader};
15
16pub trait CountTotal<G: Scope, K: ExchangeData, R: Semigroup> where G::Timestamp: TotalOrder+Lattice+Ord {
18 fn count_total(&self) -> Collection<G, (K, R), isize> {
34 self.count_total_core()
35 }
36
37 fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, R), R2>;
43}
44
45impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<G, K, R> for Collection<G, K, R>
46where G::Timestamp: TotalOrder+Lattice+Ord {
47 fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (K, R), R2> {
48 self.arrange_by_self_named("Arrange: CountTotal")
49 .count_total_core()
50 }
51}
52
53impl<G: Scope, T1> CountTotal<G, T1::KeyOwned, T1::Diff> for Arranged<G, T1>
54where
55 G::Timestamp: TotalOrder+Lattice+Ord,
56 T1: for<'a> TraceReader<Val<'a>=&'a (), Time=G::Timestamp>+Clone+'static,
57 T1::KeyOwned: ExchangeData,
58 T1::Diff: ExchangeData+Semigroup,
59{
60 fn count_total_core<R2: Semigroup + From<i8>>(&self) -> Collection<G, (T1::KeyOwned, T1::Diff), R2> {
61
62 let mut trace = self.trace.clone();
63 let mut buffer = Vec::new();
64
65 self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
66
67 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
69
70 move |input, output| {
71
72 use crate::trace::cursor::MyTrait;
73 input.for_each(|capability, batches| {
74 batches.swap(&mut buffer);
75 let mut session = output.session(&capability);
76 for batch in buffer.drain(..) {
77 let mut batch_cursor = batch.cursor();
78 let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap();
79 upper_limit.clone_from(batch.upper());
80
81 while let Some(key) = batch_cursor.get_key(&batch) {
82 let mut count: Option<T1::Diff> = None;
83
84 trace_cursor.seek_key(&trace_storage, key);
85 if trace_cursor.get_key(&trace_storage) == Some(key) {
86 trace_cursor.map_times(&trace_storage, |_, diff| {
87 count.as_mut().map(|c| c.plus_equals(diff));
88 if count.is_none() { count = Some(diff.clone()); }
89 });
90 }
91
92 batch_cursor.map_times(&batch, |time, diff| {
93
94 if let Some(count) = count.as_ref() {
95 if !count.is_zero() {
96 session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(-1i8)));
97 }
98 }
99 count.as_mut().map(|c| c.plus_equals(diff));
100 if count.is_none() { count = Some(diff.clone()); }
101 if let Some(count) = count.as_ref() {
102 if !count.is_zero() {
103 session.give(((key.into_owned(), count.clone()), time.clone(), R2::from(1i8)));
104 }
105 }
106 });
107
108 batch_cursor.step_key(&batch);
109 }
110 }
111 });
112
113 trace.advance_upper(&mut upper_limit);
115 trace.set_logical_compaction(upper_limit.borrow());
116 trace.set_physical_compaction(upper_limit.borrow());
117 }
118 })
119 .as_collection()
120 }
121}