Skip to main content

palimpsest_dataflow/
input.rs

1//! Input sessions for simplified collection updates.
2//!
3//! Although users can directly manipulate timely dataflow streams as collection inputs,
4//! the `InputSession` type can make this more efficient and less error-prone. Specifically,
5//! the type batches up updates with their logical times and ships them with coarsened
6//! timely dataflow capabilities, exposing more concurrency to the operator implementations
7//! than are evident from the logical times, which appear to execute in sequence.
8
9use timely::dataflow::operators::input::Handle;
10use timely::dataflow::operators::Input as TimelyInput;
11use timely::dataflow::scopes::ScopeParent;
12use timely::progress::Timestamp;
13
14use crate::collection::{AsCollection, VecCollection};
15use crate::difference::Semigroup;
16use crate::Data;
17
18/// Create a new collection and input handle to control the collection.
19pub trait Input: TimelyInput {
20    /// Create a new collection and input handle to subsequently control the collection.
21    ///
22    /// # Examples
23    ///
24    /// ```
25    /// use timely::Config;
26    /// use palimpsest_dataflow::input::Input;
27    ///
28    /// ::timely::execute(Config::thread(), |worker| {
29    ///
30    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
31    ///         // create input handle and collection.
32    ///         let (handle, data) = scope.new_collection();
33    ///         let probe = data.map(|x| x * 2)
34    ///                         .inspect(|x| println!("{:?}", x))
35    ///                         .probe();
36    ///         (handle, probe)
37    ///     });
38    ///
39    ///     handle.insert(1);
40    ///     handle.insert(5);
41    ///
42    /// }).unwrap();
43    /// ```
44    fn new_collection<D, R>(
45        &mut self,
46    ) -> (
47        InputSession<<Self as ScopeParent>::Timestamp, D, R>,
48        VecCollection<Self, D, R>,
49    )
50    where
51        D: Data,
52        R: Semigroup + 'static;
53    /// Create a new collection and input handle from initial data.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use timely::Config;
59    /// use palimpsest_dataflow::input::Input;
60    ///
61    /// ::timely::execute(Config::thread(), |worker| {
62    ///
63    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
64    ///         // create input handle and collection.
65    ///          let (handle, data) = scope.new_collection_from(0 .. 10);
66    ///          let probe = data.map(|x| x * 2)
67    ///                          .inspect(|x| println!("{:?}", x))
68    ///                          .probe();
69    ///          (handle, probe)
70    ///     });
71    ///
72    ///     handle.insert(1);
73    ///     handle.insert(5);
74    ///
75    /// }).unwrap();
76    /// ```
77    fn new_collection_from<I>(
78        &mut self,
79        data: I,
80    ) -> (
81        InputSession<<Self as ScopeParent>::Timestamp, I::Item, isize>,
82        VecCollection<Self, I::Item, isize>,
83    )
84    where
85        I: IntoIterator<Item: Data> + 'static;
86    /// Create a new collection and input handle from initial data.
87    ///
88    /// # Examples
89    ///
90    /// ```
91    /// use timely::Config;
92    /// use palimpsest_dataflow::input::Input;
93    ///
94    /// ::timely::execute(Config::thread(), |worker| {
95    ///
96    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
97    ///         // create input handle and collection.
98    ///         let (handle, data) = scope.new_collection_from(0 .. 10);
99    ///         let probe = data.map(|x| x * 2)
100    ///                         .inspect(|x| println!("{:?}", x))
101    ///                         .probe();
102    ///         (handle, probe)
103    ///     });
104    ///
105    ///     handle.insert(1);
106    ///     handle.insert(5);
107    ///
108    /// }).unwrap();
109    /// ```
110    fn new_collection_from_raw<D, R, I>(
111        &mut self,
112        data: I,
113    ) -> (
114        InputSession<<Self as ScopeParent>::Timestamp, D, R>,
115        VecCollection<Self, D, R>,
116    )
117    where
118        I: IntoIterator<Item = (D, <Self as ScopeParent>::Timestamp, R)> + 'static,
119        D: Data,
120        R: Semigroup + 'static;
121}
122
123use crate::lattice::Lattice;
124impl<G: TimelyInput> Input for G
125where
126    <G as ScopeParent>::Timestamp: Lattice,
127{
128    fn new_collection<D, R>(
129        &mut self,
130    ) -> (
131        InputSession<<G as ScopeParent>::Timestamp, D, R>,
132        VecCollection<G, D, R>,
133    )
134    where
135        D: Data,
136        R: Semigroup + 'static,
137    {
138        let (handle, stream) = self.new_input();
139        (InputSession::from(handle), stream.as_collection())
140    }
141    fn new_collection_from<I>(
142        &mut self,
143        data: I,
144    ) -> (
145        InputSession<<G as ScopeParent>::Timestamp, I::Item, isize>,
146        VecCollection<G, I::Item, isize>,
147    )
148    where
149        I: IntoIterator + 'static,
150        I::Item: Data,
151    {
152        self.new_collection_from_raw(data.into_iter().map(|d| {
153            (
154                d,
155                <G::Timestamp as timely::progress::Timestamp>::minimum(),
156                1,
157            )
158        }))
159    }
160    fn new_collection_from_raw<D, R, I>(
161        &mut self,
162        data: I,
163    ) -> (
164        InputSession<<G as ScopeParent>::Timestamp, D, R>,
165        VecCollection<G, D, R>,
166    )
167    where
168        D: Data,
169        R: Semigroup + 'static,
170        I: IntoIterator<Item = (D, <Self as ScopeParent>::Timestamp, R)> + 'static,
171    {
172        use timely::dataflow::operators::ToStream;
173
174        let (handle, stream) = self.new_input();
175        let source = data.to_stream(self).as_collection();
176
177        (
178            InputSession::from(handle),
179            stream.as_collection().concat(&source),
180        )
181    }
182}
183
184/// An input session wrapping a single timely dataflow capability.
185///
186/// Each timely dataflow message has a corresponding capability, which is a logical time in the
187/// timely dataflow system. Differential dataflow updates can happen at a much higher rate than
188/// timely dataflow's progress tracking infrastructure supports, because the logical times are
189/// promoted to data and updates are batched together. The `InputSession` type does this batching.
190///
191/// # Examples
192///
193/// ```
194/// use timely::Config;
195/// use palimpsest_dataflow::input::Input;
196///
197/// ::timely::execute(Config::thread(), |worker| {
198///
199///     let (mut handle, probe) = worker.dataflow(|scope| {
200///         // create input handle and collection.
201///         let (handle, data) = scope.new_collection_from(0 .. 10);
202///         let probe = data.map(|x| x * 2)
203///                         .inspect(|x| println!("{:?}", x))
204///                         .probe();
205///         (handle, probe)
206///     });
207///
208///     handle.insert(3);
209///     handle.advance_to(1);
210///     handle.insert(5);
211///     handle.advance_to(2);
212///     handle.flush();
213///
214///     while probe.less_than(handle.time()) {
215///         worker.step();
216///     }
217///
218///     handle.remove(5);
219///     handle.advance_to(3);
220///     handle.flush();
221///
222///     while probe.less_than(handle.time()) {
223///         worker.step();
224///     }
225///
226/// }).unwrap();
227/// ```
228pub struct InputSession<T: Timestamp + Clone, D: Data, R: Semigroup + 'static> {
229    time: T,
230    buffer: Vec<(D, T, R)>,
231    handle: Handle<T, (D, T, R)>,
232}
233
234impl<T: Timestamp + Clone, D: Data> InputSession<T, D, isize> {
235    /// Adds an element to the collection.
236    pub fn insert(&mut self, element: D) {
237        self.update(element, 1);
238    }
239    /// Removes an element from the collection.
240    pub fn remove(&mut self, element: D) {
241        self.update(element, -1);
242    }
243}
244
245// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i64> {
246//     /// Adds an element to the collection.
247//     pub fn insert(&mut self, element: D) { self.update(element, 1); }
248//     /// Removes an element from the collection.
249//     pub fn remove(&mut self, element: D) { self.update(element,-1); }
250// }
251
252// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i32> {
253//     /// Adds an element to the collection.
254//     pub fn insert(&mut self, element: D) { self.update(element, 1); }
255//     /// Removes an element from the collection.
256//     pub fn remove(&mut self, element: D) { self.update(element,-1); }
257// }
258
259impl<T: Timestamp + Clone, D: Data, R: Semigroup + 'static> InputSession<T, D, R> {
260    /// Introduces a handle as collection.
261    pub fn to_collection<G: TimelyInput>(&mut self, scope: &mut G) -> VecCollection<G, D, R>
262    where
263        G: ScopeParent<Timestamp = T>,
264    {
265        scope.input_from(&mut self.handle).as_collection()
266    }
267
268    /// Allocates a new input handle.
269    pub fn new() -> Self {
270        let handle: Handle<T, _> = Handle::new();
271        InputSession {
272            time: handle.time().clone(),
273            buffer: Vec::new(),
274            handle,
275        }
276    }
277
278    /// Creates a new session from a reference to an input handle.
279    pub fn from(handle: Handle<T, (D, T, R)>) -> Self {
280        InputSession {
281            time: handle.time().clone(),
282            buffer: Vec::new(),
283            handle,
284        }
285    }
286
287    /// Adds to the weight of an element in the collection.
288    pub fn update(&mut self, element: D, change: R) {
289        if self.buffer.len() == self.buffer.capacity() {
290            if !self.buffer.is_empty() {
291                self.handle.send_batch(&mut self.buffer);
292            }
293            // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
294            self.buffer.reserve(1024);
295        }
296        self.buffer.push((element, self.time.clone(), change));
297    }
298
299    /// Adds to the weight of an element in the collection at a future time.
300    pub fn update_at(&mut self, element: D, time: T, change: R) {
301        assert!(self.time.less_equal(&time));
302        if self.buffer.len() == self.buffer.capacity() {
303            if !self.buffer.is_empty() {
304                self.handle.send_batch(&mut self.buffer);
305            }
306            // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
307            self.buffer.reserve(1024);
308        }
309        self.buffer.push((element, time, change));
310    }
311
312    /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session.
313    ///
314    /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is
315    /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is
316    /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.
317    pub fn flush(&mut self) {
318        self.handle.send_batch(&mut self.buffer);
319        if self.handle.epoch().less_than(&self.time) {
320            self.handle.advance_to(self.time.clone());
321        }
322    }
323
324    /// Advances the logical time for future records.
325    ///
326    /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when
327    /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while`
328    /// method unless the session has just been flushed.
329    pub fn advance_to(&mut self, time: T) {
330        assert!(self.handle.epoch().less_equal(&time));
331        assert!(&self.time.less_equal(&time));
332        self.time = time;
333    }
334
335    /// Reveals the current time of the session.
336    pub fn epoch(&self) -> &T {
337        &self.time
338    }
339    /// Reveals the current time of the session.
340    pub fn time(&self) -> &T {
341        &self.time
342    }
343
344    /// Closes the input, flushing and sealing the wrapped timely input.
345    pub fn close(self) {}
346}
347
348impl<T: Timestamp + Clone, D: Data, R: Semigroup + 'static> Drop for InputSession<T, D, R> {
349    fn drop(&mut self) {
350        self.flush();
351    }
352}