1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
use timely::order::TotalOrder;
use timely::dataflow::*;
use timely::dataflow::operators::Operator;
use timely::dataflow::channels::pact::Pipeline;
use lattice::Lattice;
use ::{ExchangeData, Collection};
use ::difference::Monoid;
use hashable::Hashable;
use collection::AsCollection;
use operators::arrange::{Arranged, ArrangeBySelf};
use trace::{BatchReader, Cursor, TraceReader};
pub trait CountTotal<G: Scope, K: ExchangeData, R: Monoid> where G::Timestamp: TotalOrder+Lattice+Ord {
fn count_total(&self) -> Collection<G, (K, R), isize>;
}
impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Monoid> CountTotal<G, K, R> for Collection<G, K, R>
where G::Timestamp: TotalOrder+Lattice+Ord {
fn count_total(&self) -> Collection<G, (K, R), isize> {
self.arrange_by_self()
.count_total()
}
}
impl<G: Scope, T1> CountTotal<G, T1::Key, T1::R> for Arranged<G, T1>
where
G::Timestamp: TotalOrder+Lattice+Ord,
T1: TraceReader<Val=(), Time=G::Timestamp>+Clone+'static,
T1::Key: ExchangeData,
T1::R: ExchangeData+Monoid,
T1::Batch: BatchReader<T1::Key, (), G::Timestamp, T1::R>,
T1::Cursor: Cursor<T1::Key, (), G::Timestamp, T1::R>,
{
fn count_total(&self) -> Collection<G, (T1::Key, T1::R), isize> {
let mut trace = self.trace.clone();
let mut buffer = Vec::new();
self.stream.unary(Pipeline, "CountTotal", move |_,_| move |input, output| {
input.for_each(|capability, batches| {
batches.swap(&mut buffer);
let mut session = output.session(&capability);
for batch in buffer.drain(..) {
let mut batch_cursor = batch.cursor();
let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower()).unwrap();
while batch_cursor.key_valid(&batch) {
let key = batch_cursor.key(&batch);
let mut count = <T1::R>::zero();
trace_cursor.seek_key(&trace_storage, key);
if trace_cursor.key_valid(&trace_storage) && trace_cursor.key(&trace_storage) == key {
trace_cursor.map_times(&trace_storage, |_, diff| count += diff);
}
batch_cursor.map_times(&batch, |time, diff| {
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), -1));
}
count += diff;
if !count.is_zero() {
session.give(((key.clone(), count.clone()), time.clone(), 1));
}
});
batch_cursor.step_key(&batch);
}
trace.advance_by(batch.upper());
trace.distinguish_since(batch.upper());
}
});
})
.as_collection()
}
}