1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use std::rc::Rc;
use std::cell::RefCell;
use progress::Timestamp;
use progress::count_map::CountMap;
use dataflow::channels::pullers::Counter as PullCounter;
use dataflow::channels::pushers::Counter as PushCounter;
use dataflow::channels::pushers::buffer::{Buffer, Session};
use dataflow::channels::Content;
use timely_communication::Push;
use dataflow::operators::Capability;
use dataflow::operators::capability::mint as mint_capability;
pub struct InputHandle<'a, T: Timestamp, D: 'a> {
pull_counter: &'a mut PullCounter<T, D>,
internal: Rc<RefCell<CountMap<T>>>,
}
impl<'a, T: Timestamp, D> InputHandle<'a, T, D> {
#[inline]
pub fn next(&mut self) -> Option<(Capability<T>, &mut Content<D>)> {
let internal = &mut self.internal;
self.pull_counter.next().map(|(&time, content)| {
(mint_capability(time, internal.clone()), content)
})
}
#[inline]
pub fn for_each<F: FnMut(Capability<T>, &mut Content<D>)>(&mut self, mut logic: F) {
while let Some((cap, data)) = self.next() {
::logging::log(&::logging::GUARDED_MESSAGE, true);
logic(cap, data);
::logging::log(&::logging::GUARDED_MESSAGE, false);
}
}
}
pub fn new_input_handle<'a, T: Timestamp, D: 'a>(pull_counter: &'a mut PullCounter<T, D>, internal: Rc<RefCell<CountMap<T>>>) -> InputHandle<'a, T, D> {
InputHandle {
pull_counter: pull_counter,
internal: internal,
}
}
pub struct OutputHandle<'a, T: Timestamp, D: 'a, P: Push<(T, Content<D>)>+'a> {
push_buffer: &'a mut Buffer<T, D, PushCounter<T, D, P>>,
}
impl<'a, T: Timestamp, D, P: Push<(T, Content<D>)>> OutputHandle<'a, T, D, P> {
pub fn session<'b>(&'b mut self, cap: &Capability<T>) -> Session<'b, T, D, PushCounter<T, D, P>> where 'a: 'b {
self.push_buffer.session(cap)
}
}
pub fn new_output_handle<'a, T: Timestamp, D, P: Push<(T, Content<D>)>>(push_buffer: &'a mut Buffer<T, D, PushCounter<T, D, P>>) -> OutputHandle<'a, T, D, P> {
OutputHandle {
push_buffer: push_buffer,
}
}