1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
//! Create new `Streams` connected to external inputs.
use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use progress::frontier::Antichain;
use progress::{Operate, Timestamp};
use progress::nested::subgraph::Source;
use progress::count_map::CountMap;
use progress::timestamp::RootTimestamp;
use progress::nested::product::Product;
use timely_communication::Allocate;
use {Data, Push};
use dataflow::channels::Content;
use dataflow::channels::pushers::{Tee, Counter};
use dataflow::{Stream, Scope};
use dataflow::scopes::{Child, Root};
// TODO : This is an exogenous input, but it would be nice to wrap a Subgraph in something
// TODO : more like a harness, with direct access to its inputs.
// NOTE : This only takes a &self, not a &mut self, which works but is a bit weird.
// NOTE : Experiments with &mut indicate that the borrow of 'a lives for too long.
// NOTE : Might be able to fix with another lifetime parameter, say 'c: 'a.
/// Create a new `Stream` and `Handle` through which to supply input.
pub trait Input<'a, A: Allocate, T: Timestamp+Ord> {
/// Create a new `Stream` and `Handle` through which to supply input.
///
/// The `new_input` method returns a pair `(Handle, Stream)` where the `Stream` can be used
/// immediately for timely dataflow construction, and the `Handle` is later used to introduce
/// data into the timely dataflow computation.
///
/// The `Handle` also provides a means to indicate
/// to timely dataflow that the input has advanced beyond certain timestamps, allowing timely
/// to issue progress notifications.
///
/// #Examples
/// ```
/// use timely::*;
/// use timely::dataflow::operators::{Input, Inspect};
///
/// // construct and execute a timely dataflow
/// timely::execute(Configuration::Thread, |worker| {
///
/// // add an input and base computation off of it
/// let mut input = worker.dataflow(|scope| {
/// let (input, stream) = scope.new_input();
/// stream.inspect(|x| println!("hello {:?}", x));
/// input
/// });
///
/// // introduce input, advance computation
/// for round in 0..10 {
/// input.send(round);
/// input.advance_to(round + 1);
/// worker.step();
/// }
/// });
/// ```
fn new_input<D: Data>(&mut self) -> (Handle<T, D>, Stream<Child<'a, Root<A>, T>, D>);
}
impl<'a, A: Allocate, T: Timestamp+Ord> Input<'a, A, T> for Child<'a, Root<A>, T> {
fn new_input<D: Data>(&mut self) -> (Handle<T, D>, Stream<Child<'a, Root<A>, T>, D>) {
let (output, registrar) = Tee::<Product<RootTimestamp, T>, D>::new();
let produced = Rc::new(RefCell::new(CountMap::new()));
let helper = Handle::new(Counter::new(output, produced.clone()));
let copies = self.peers();
let index = self.add_operator(Operator {
progress: helper.progress.clone(),
messages: produced.clone(),
copies: copies,
});
(helper, Stream::new(Source { index: index, port: 0 }, registrar, self.clone()))
}
}
struct Operator<T:Timestamp+Ord> {
progress: Rc<RefCell<CountMap<Product<RootTimestamp, T>>>>, // times closed since last asked
messages: Rc<RefCell<CountMap<Product<RootTimestamp, T>>>>, // messages sent since last asked
copies: usize,
}
impl<T:Timestamp+Ord> Operate<Product<RootTimestamp, T>> for Operator<T> {
fn name(&self) -> String { "Input".to_owned() }
fn inputs(&self) -> usize { 0 }
fn outputs(&self) -> usize { 1 }
fn get_internal_summary(&mut self) -> (Vec<Vec<Antichain<<Product<RootTimestamp, T> as Timestamp>::Summary>>>,
Vec<CountMap<Product<RootTimestamp, T>>>) {
let mut map = CountMap::new();
map.update(&Default::default(), self.copies as i64);
(Vec::new(), vec![map])
}
fn pull_internal_progress(&mut self,_messages_consumed: &mut [CountMap<Product<RootTimestamp, T>>],
frontier_progress: &mut [CountMap<Product<RootTimestamp, T>>],
messages_produced: &mut [CountMap<Product<RootTimestamp, T>>]) -> bool
{
self.messages.borrow_mut().drain_into(&mut messages_produced[0]);
self.progress.borrow_mut().drain_into(&mut frontier_progress[0]);
false
}
fn notify_me(&self) -> bool { false }
}
/// A handle to an input `Stream`, used to introduce data to a timely dataflow computation.
pub struct Handle<T: Timestamp, D: Data> {
// frontier: Rc<RefCell<MutableAntichain<Product<RootTimestamp, T>>>>, // times available for sending
progress: Rc<RefCell<CountMap<Product<RootTimestamp, T>>>>, // times closed since last asked
pusher: Counter<Product<RootTimestamp, T>, D, Tee<Product<RootTimestamp, T>, D>>,
buffer: Vec<D>,
now_at: Product<RootTimestamp, T>,
}
// an input helper's state is either uninitialized, with now_at == None, or at some specific time.
// if now_at == None it has a hold on Default::default(), else it has a hold on the specific time.
// if now_at == None the pusher has not been opened, else it is open with the specific time.
impl<T:Timestamp, D: Data> Handle<T, D> {
fn new(pusher: Counter<Product<RootTimestamp, T>, D, Tee<Product<RootTimestamp, T>, D>>) -> Handle<T, D> {
Handle {
// frontier: Rc::new(RefCell::new(MutableAntichain::new_bottom(Default::default()))),
progress: Rc::new(RefCell::new(CountMap::new())),
pusher: pusher,
buffer: Vec::with_capacity(Content::<D>::default_length()),
now_at: Default::default(),
}
}
// flushes any data we are sitting on. may need to initialize self.now_at if no one has yet.
fn flush(&mut self) {
Content::push_at(&mut self.buffer, self.now_at.clone(), &mut self.pusher);
}
// closes the current epoch, flushing if needed, shutting if needed, and updating the frontier.
fn close_epoch(&mut self) {
if !self.buffer.is_empty() { self.flush(); }
self.pusher.done();
self.progress.borrow_mut().update(&self.now_at, -1);
}
#[inline(always)]
/// Sends one record into the corresponding timely dataflow `Stream`, at the current epoch.
pub fn send(&mut self, data: D) {
// assert!(self.buffer.capacity() == Content::<D>::default_length());
self.buffer.push(data);
if self.buffer.len() == self.buffer.capacity() {
self.flush();
}
}
/// Sends a batch of records into the corresponding timely dataflow `Stream`, at the current epoch.
///
/// This method flushes single elements previously sent with `send`, to keep the insertion order.
pub fn send_batch(&mut self, buffer: &mut Vec<D>) {
self.flush();
Content::push_at(buffer, self.now_at.clone(), &mut self.pusher);
}
/// Advances the current epoch to `next`.
///
/// This method allows timely dataflow to issue progress notifications as it can now determine
/// that this input can no longer produce data at earlier timestamps.
pub fn advance_to(&mut self, next: T) {
assert!(self.now_at.inner.less_than(&next));
self.close_epoch();
self.now_at = RootTimestamp::new(next);
self.progress.borrow_mut().update(&self.now_at, 1);
}
/// Closes the input.
///
/// This method allows timely dataflow to issue all progress notifications blocked by this input
/// and to begin to shut down operators, as this input can no longer produce data.
pub fn close(self) { }
/// Reports the current epoch.
pub fn epoch(&self) -> &T {
&self.now_at.inner
}
/// Reports the current timestamp.
pub fn time(&self) -> &Product<RootTimestamp, T> {
&self.now_at
}
}
impl<T:Timestamp, D: Data> Drop for Handle<T, D> {
fn drop(&mut self) {
self.close_epoch();
}
}