differential_dataflow/operators/
count.rs1use timely::order::TotalOrder;
4use timely::progress::Timestamp;
5use timely::dataflow::operators::Operator;
6use timely::dataflow::channels::pact::Pipeline;
7
8use crate::lattice::Lattice;
9use crate::{ExchangeData, VecCollection};
10use crate::difference::{IsZero, Semigroup};
11use crate::hashable::Hashable;
12use crate::collection::AsCollection;
13use crate::operators::arrange::Arranged;
14use crate::trace::{BatchReader, Cursor, TraceReader};
15
16pub trait CountTotal<'scope, T: Timestamp + TotalOrder + Lattice + Ord, K: ExchangeData, R: Semigroup> : Sized {
18 fn count_total(self) -> VecCollection<'scope, T, (K, R), isize> {
34 self.count_total_core()
35 }
36
37 fn count_total_core<R2: Semigroup + From<i8> + 'static>(self) -> VecCollection<'scope, T, (K, R), R2>;
43}
44
45impl<'scope, T, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> CountTotal<'scope, T, K, R> for VecCollection<'scope, T, K, R>
46where
47 T: Timestamp + TotalOrder + Lattice + Ord,
48{
49 fn count_total_core<R2: Semigroup + From<i8> + 'static>(self) -> VecCollection<'scope, T, (K, R), R2> {
50 self.arrange_by_self_named("Arrange: CountTotal")
51 .count_total_core()
52 }
53}
54
55impl<'scope, K, Tr> CountTotal<'scope, Tr::Time, K, Tr::Diff> for Arranged<'scope, Tr>
56where
57 Tr: for<'a> TraceReader<
58 Key<'a> = &'a K,
59 Val<'a>=&'a (),
60 Time: TotalOrder,
61 Diff: ExchangeData+Semigroup<Tr::DiffGat<'a>>
62 >+Clone+'static,
63 K: ExchangeData,
64{
65 fn count_total_core<R2: Semigroup + From<i8> + 'static>(self) -> VecCollection<'scope, Tr::Time, (K, Tr::Diff), R2> {
66
67 let mut trace = self.trace.clone();
68
69 self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {
70
71 let mut lower_limit = timely::progress::frontier::Antichain::from_elem(Tr::Time::minimum());
73 let mut upper_limit = timely::progress::frontier::Antichain::from_elem(Tr::Time::minimum());
74
75 move |(input, _frontier), output| {
76
77 let mut batch_cursors = Vec::new();
78 let mut batch_storage = Vec::new();
79
80 lower_limit.clear();
82 lower_limit.extend(upper_limit.borrow().iter().cloned());
83
84 let mut cap = None;
85 input.for_each(|capability, batches| {
86 if cap.is_none() { cap = Some(capability.retain(0));
88 }
89 for batch in batches.drain(..) {
90 upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor());
92 batch_storage.push(batch);
93 }
94 });
95
96 if let Some(capability) = cap {
97
98 let mut session = output.session(&capability);
99
100 use crate::trace::cursor::CursorList;
101 let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage);
102 let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap();
103
104 while let Some(key) = batch_cursor.get_key(&batch_storage) {
105 let mut count: Option<Tr::Diff> = None;
106
107 trace_cursor.seek_key(&trace_storage, key);
108 if trace_cursor.get_key(&trace_storage) == Some(key) {
109 trace_cursor.map_times(&trace_storage, |_, diff| {
110 count.as_mut().map(|c| c.plus_equals(&diff));
111 if count.is_none() { count = Some(Tr::owned_diff(diff)); }
112 });
113 }
114
115 batch_cursor.map_times(&batch_storage, |time, diff| {
116
117 if let Some(count) = count.as_ref() {
118 if !count.is_zero() {
119 session.give(((key.clone(), count.clone()), Tr::owned_time(time), R2::from(-1i8)));
120 }
121 }
122 count.as_mut().map(|c| c.plus_equals(&diff));
123 if count.is_none() { count = Some(Tr::owned_diff(diff)); }
124 if let Some(count) = count.as_ref() {
125 if !count.is_zero() {
126 session.give(((key.clone(), count.clone()), Tr::owned_time(time), R2::from(1i8)));
127 }
128 }
129 });
130
131 batch_cursor.step_key(&batch_storage);
132 }
133 }
134
135 trace.advance_upper(&mut upper_limit);
137 trace.set_logical_compaction(upper_limit.borrow());
138 trace.set_physical_compaction(upper_limit.borrow());
139 }
140 })
141 .as_collection()
142 }
143}