1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
use std::rc::Rc;
use std::cell::RefCell;
use progress::Timestamp;
use progress::ChangeBatch;
use progress::frontier::MutableAntichain;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::{Buffer, Session};
use dataflow::channels::Content;
use timely_communication::{Push, Pull};
use dataflow::operators::Capability;
use dataflow::operators::capability::mint as mint_capability;
pub struct InputHandle<T: Timestamp, D, P: Pull<(T, Content<D>)>> {
pull_counter: PullCounter<T, D, P>,
internal: Rc<RefCell<ChangeBatch<T>>>,
}
pub struct FrontieredInputHandle<'a, T: Timestamp, D: 'a, P: Pull<(T, Content<D>)>+'a> {
pub handle: &'a mut InputHandle<T, D, P>,
pub frontier: &'a MutableAntichain<T>,
}
impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>> InputHandle<T, D, P> {
#[inline(always)]
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
let internal = &mut self.internal;
self.pull_counter.next().map(|(time, content)| {
(mint_capability(time.clone(), internal.clone()), content)
})
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, &mut Content<D>)>(&mut self, mut logic: F) {
while let Some((cap, data)) = self.next() {
::logging::log(&::logging::GUARDED_MESSAGE, true);
logic(cap, data);
::logging::log(&::logging::GUARDED_MESSAGE, false);
}
}
}
impl<'a, T: Timestamp, D, P: Pull<(T, Content<D>)>+'a> FrontieredInputHandle<'a, T, D, P> {
#[inline(always)]
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
self.handle.next()
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, &mut Content<D>)>(&mut self, logic: F) {
self.handle.for_each(logic)
}
#[inline(always)]
pub fn frontier(&self) -> &'a MutableAntichain<T> {
self.frontier
}
}
pub fn _access_pull_counter<T: Timestamp, D, P: Pull<(T, Content<D>)>>(input: &mut InputHandle<T, D, P>) -> &mut PullCounter<T, D, P> {
&mut input.pull_counter
}
pub fn new_input_handle<T: Timestamp, D, P: Pull<(T, Content<D>)>>(pull_counter: PullCounter<T, D, P>, internal: Rc<RefCell<ChangeBatch<T>>>) -> InputHandle<T, D, P> {
InputHandle {
pull_counter: pull_counter,
internal: internal,
}
}
pub fn new_frontier_input_handle<'a, T: Timestamp, D: 'a, P: Pull<(T, Content<D>)>+'a>(input_handle: &'a mut InputHandle<T, D, P>, frontier: &'a MutableAntichain<T>) -> FrontieredInputHandle<'a, T, D, P> {
FrontieredInputHandle {
handle: input_handle,
frontier: frontier,
}
}
pub struct OutputHandle<'a, T: Timestamp, D: 'a, P: Push<(T, Content<D>)>+'a> {
push_buffer: &'a mut Buffer<T, D, PushCounter<T, D, P>>,
}
impl<'a, T: Timestamp, D, P: Push<(T, Content<D>)>> OutputHandle<'a, T, D, P> {
pub fn session<'b>(&'b mut self, cap: &'b Capability<T>) -> Session<'b, T, D, PushCounter<T, D, P>> where 'a: 'b {
self.push_buffer.session(cap)
}
}
impl<'a, T: Timestamp, D, P: Push<(T, Content<D>)>> Drop for OutputHandle<'a, T, D, P> {
fn drop(&mut self) {
self.push_buffer.cease();
}
}
pub fn new_output_handle<'a, T: Timestamp, D, P: Push<(T, Content<D>)>>(push_buffer: &'a mut Buffer<T, D, PushCounter<T, D, P>>) -> OutputHandle<'a, T, D, P> {
OutputHandle {
push_buffer: push_buffer,
}
}