palimpsest_dataflow/input.rs
1//! Input sessions for simplified collection updates.
2//!
3//! Although users can directly manipulate timely dataflow streams as collection inputs,
4//! the `InputSession` type can make this more efficient and less error-prone. Specifically,
5//! the type batches up updates with their logical times and ships them with coarsened
6//! timely dataflow capabilities, exposing more concurrency to the operator implementations
7//! than are evident from the logical times, which appear to execute in sequence.
8
9use timely::dataflow::operators::input::Handle;
10use timely::dataflow::operators::Input as TimelyInput;
11use timely::dataflow::scopes::ScopeParent;
12use timely::progress::Timestamp;
13
14use crate::collection::{AsCollection, VecCollection};
15use crate::difference::Semigroup;
16use crate::Data;
17
18/// Create a new collection and input handle to control the collection.
19pub trait Input: TimelyInput {
20 /// Create a new collection and input handle to subsequently control the collection.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use timely::Config;
26 /// use palimpsest_dataflow::input::Input;
27 ///
28 /// ::timely::execute(Config::thread(), |worker| {
29 ///
30 /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
31 /// // create input handle and collection.
32 /// let (handle, data) = scope.new_collection();
33 /// let probe = data.map(|x| x * 2)
34 /// .inspect(|x| println!("{:?}", x))
35 /// .probe();
36 /// (handle, probe)
37 /// });
38 ///
39 /// handle.insert(1);
40 /// handle.insert(5);
41 ///
42 /// }).unwrap();
43 /// ```
44 fn new_collection<D, R>(
45 &mut self,
46 ) -> (
47 InputSession<<Self as ScopeParent>::Timestamp, D, R>,
48 VecCollection<Self, D, R>,
49 )
50 where
51 D: Data,
52 R: Semigroup + 'static;
53 /// Create a new collection and input handle from initial data.
54 ///
55 /// # Examples
56 ///
57 /// ```
58 /// use timely::Config;
59 /// use palimpsest_dataflow::input::Input;
60 ///
61 /// ::timely::execute(Config::thread(), |worker| {
62 ///
63 /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
64 /// // create input handle and collection.
65 /// let (handle, data) = scope.new_collection_from(0 .. 10);
66 /// let probe = data.map(|x| x * 2)
67 /// .inspect(|x| println!("{:?}", x))
68 /// .probe();
69 /// (handle, probe)
70 /// });
71 ///
72 /// handle.insert(1);
73 /// handle.insert(5);
74 ///
75 /// }).unwrap();
76 /// ```
77 fn new_collection_from<I>(
78 &mut self,
79 data: I,
80 ) -> (
81 InputSession<<Self as ScopeParent>::Timestamp, I::Item, isize>,
82 VecCollection<Self, I::Item, isize>,
83 )
84 where
85 I: IntoIterator<Item: Data> + 'static;
86 /// Create a new collection and input handle from initial data.
87 ///
88 /// # Examples
89 ///
90 /// ```
91 /// use timely::Config;
92 /// use palimpsest_dataflow::input::Input;
93 ///
94 /// ::timely::execute(Config::thread(), |worker| {
95 ///
96 /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
97 /// // create input handle and collection.
98 /// let (handle, data) = scope.new_collection_from(0 .. 10);
99 /// let probe = data.map(|x| x * 2)
100 /// .inspect(|x| println!("{:?}", x))
101 /// .probe();
102 /// (handle, probe)
103 /// });
104 ///
105 /// handle.insert(1);
106 /// handle.insert(5);
107 ///
108 /// }).unwrap();
109 /// ```
110 fn new_collection_from_raw<D, R, I>(
111 &mut self,
112 data: I,
113 ) -> (
114 InputSession<<Self as ScopeParent>::Timestamp, D, R>,
115 VecCollection<Self, D, R>,
116 )
117 where
118 I: IntoIterator<Item = (D, <Self as ScopeParent>::Timestamp, R)> + 'static,
119 D: Data,
120 R: Semigroup + 'static;
121}
122
123use crate::lattice::Lattice;
124impl<G: TimelyInput> Input for G
125where
126 <G as ScopeParent>::Timestamp: Lattice,
127{
128 fn new_collection<D, R>(
129 &mut self,
130 ) -> (
131 InputSession<<G as ScopeParent>::Timestamp, D, R>,
132 VecCollection<G, D, R>,
133 )
134 where
135 D: Data,
136 R: Semigroup + 'static,
137 {
138 let (handle, stream) = self.new_input();
139 (InputSession::from(handle), stream.as_collection())
140 }
141 fn new_collection_from<I>(
142 &mut self,
143 data: I,
144 ) -> (
145 InputSession<<G as ScopeParent>::Timestamp, I::Item, isize>,
146 VecCollection<G, I::Item, isize>,
147 )
148 where
149 I: IntoIterator + 'static,
150 I::Item: Data,
151 {
152 self.new_collection_from_raw(data.into_iter().map(|d| {
153 (
154 d,
155 <G::Timestamp as timely::progress::Timestamp>::minimum(),
156 1,
157 )
158 }))
159 }
160 fn new_collection_from_raw<D, R, I>(
161 &mut self,
162 data: I,
163 ) -> (
164 InputSession<<G as ScopeParent>::Timestamp, D, R>,
165 VecCollection<G, D, R>,
166 )
167 where
168 D: Data,
169 R: Semigroup + 'static,
170 I: IntoIterator<Item = (D, <Self as ScopeParent>::Timestamp, R)> + 'static,
171 {
172 use timely::dataflow::operators::ToStream;
173
174 let (handle, stream) = self.new_input();
175 let source = data.to_stream(self).as_collection();
176
177 (
178 InputSession::from(handle),
179 stream.as_collection().concat(&source),
180 )
181 }
182}
183
184/// An input session wrapping a single timely dataflow capability.
185///
186/// Each timely dataflow message has a corresponding capability, which is a logical time in the
187/// timely dataflow system. Differential dataflow updates can happen at a much higher rate than
188/// timely dataflow's progress tracking infrastructure supports, because the logical times are
189/// promoted to data and updates are batched together. The `InputSession` type does this batching.
190///
191/// # Examples
192///
193/// ```
194/// use timely::Config;
195/// use palimpsest_dataflow::input::Input;
196///
197/// ::timely::execute(Config::thread(), |worker| {
198///
199/// let (mut handle, probe) = worker.dataflow(|scope| {
200/// // create input handle and collection.
201/// let (handle, data) = scope.new_collection_from(0 .. 10);
202/// let probe = data.map(|x| x * 2)
203/// .inspect(|x| println!("{:?}", x))
204/// .probe();
205/// (handle, probe)
206/// });
207///
208/// handle.insert(3);
209/// handle.advance_to(1);
210/// handle.insert(5);
211/// handle.advance_to(2);
212/// handle.flush();
213///
214/// while probe.less_than(handle.time()) {
215/// worker.step();
216/// }
217///
218/// handle.remove(5);
219/// handle.advance_to(3);
220/// handle.flush();
221///
222/// while probe.less_than(handle.time()) {
223/// worker.step();
224/// }
225///
226/// }).unwrap();
227/// ```
228pub struct InputSession<T: Timestamp + Clone, D: Data, R: Semigroup + 'static> {
229 time: T,
230 buffer: Vec<(D, T, R)>,
231 handle: Handle<T, (D, T, R)>,
232}
233
234impl<T: Timestamp + Clone, D: Data> InputSession<T, D, isize> {
235 /// Adds an element to the collection.
236 pub fn insert(&mut self, element: D) {
237 self.update(element, 1);
238 }
239 /// Removes an element from the collection.
240 pub fn remove(&mut self, element: D) {
241 self.update(element, -1);
242 }
243}
244
245// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i64> {
246// /// Adds an element to the collection.
247// pub fn insert(&mut self, element: D) { self.update(element, 1); }
248// /// Removes an element from the collection.
249// pub fn remove(&mut self, element: D) { self.update(element,-1); }
250// }
251
252// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i32> {
253// /// Adds an element to the collection.
254// pub fn insert(&mut self, element: D) { self.update(element, 1); }
255// /// Removes an element from the collection.
256// pub fn remove(&mut self, element: D) { self.update(element,-1); }
257// }
258
259impl<T: Timestamp + Clone, D: Data, R: Semigroup + 'static> InputSession<T, D, R> {
260 /// Introduces a handle as collection.
261 pub fn to_collection<G: TimelyInput>(&mut self, scope: &mut G) -> VecCollection<G, D, R>
262 where
263 G: ScopeParent<Timestamp = T>,
264 {
265 scope.input_from(&mut self.handle).as_collection()
266 }
267
268 /// Allocates a new input handle.
269 pub fn new() -> Self {
270 let handle: Handle<T, _> = Handle::new();
271 InputSession {
272 time: handle.time().clone(),
273 buffer: Vec::new(),
274 handle,
275 }
276 }
277
278 /// Creates a new session from a reference to an input handle.
279 pub fn from(handle: Handle<T, (D, T, R)>) -> Self {
280 InputSession {
281 time: handle.time().clone(),
282 buffer: Vec::new(),
283 handle,
284 }
285 }
286
287 /// Adds to the weight of an element in the collection.
288 pub fn update(&mut self, element: D, change: R) {
289 if self.buffer.len() == self.buffer.capacity() {
290 if !self.buffer.is_empty() {
291 self.handle.send_batch(&mut self.buffer);
292 }
293 // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
294 self.buffer.reserve(1024);
295 }
296 self.buffer.push((element, self.time.clone(), change));
297 }
298
299 /// Adds to the weight of an element in the collection at a future time.
300 pub fn update_at(&mut self, element: D, time: T, change: R) {
301 assert!(self.time.less_equal(&time));
302 if self.buffer.len() == self.buffer.capacity() {
303 if !self.buffer.is_empty() {
304 self.handle.send_batch(&mut self.buffer);
305 }
306 // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
307 self.buffer.reserve(1024);
308 }
309 self.buffer.push((element, time, change));
310 }
311
312 /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session.
313 ///
314 /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is
315 /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is
316 /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.
317 pub fn flush(&mut self) {
318 self.handle.send_batch(&mut self.buffer);
319 if self.handle.epoch().less_than(&self.time) {
320 self.handle.advance_to(self.time.clone());
321 }
322 }
323
324 /// Advances the logical time for future records.
325 ///
326 /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when
327 /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while`
328 /// method unless the session has just been flushed.
329 pub fn advance_to(&mut self, time: T) {
330 assert!(self.handle.epoch().less_equal(&time));
331 assert!(&self.time.less_equal(&time));
332 self.time = time;
333 }
334
335 /// Reveals the current time of the session.
336 pub fn epoch(&self) -> &T {
337 &self.time
338 }
339 /// Reveals the current time of the session.
340 pub fn time(&self) -> &T {
341 &self.time
342 }
343
344 /// Closes the input, flushing and sealing the wrapped timely input.
345 pub fn close(self) {}
346}
347
348impl<T: Timestamp + Clone, D: Data, R: Semigroup + 'static> Drop for InputSession<T, D, R> {
349 fn drop(&mut self) {
350 self.flush();
351 }
352}