Skip to main content

differential_dataflow/
input.rs

1//! Input sessions for simplified collection updates.
2//!
3//! Although users can directly manipulate timely dataflow streams as collection inputs,
4//! the `InputSession` type can make this more efficient and less error-prone. Specifically,
5//! the type batches up updates with their logical times and ships them with coarsened
6//! timely dataflow capabilities, exposing more concurrency to the operator implementations
7//! than are evident from the logical times, which appear to execute in sequence.
8
9use timely::progress::Timestamp;
10use timely::dataflow::operators::vec::Input as TimelyInput;
11use timely::dataflow::operators::vec::input::Handle;
12use timely::dataflow::Scope;
13
14use crate::Data;
15use crate::difference::Semigroup;
16use crate::collection::{VecCollection, AsCollection};
17
18/// Create a new collection and input handle to control the collection.
19pub trait Input<'scope> : TimelyInput<'scope> {
20    /// Create a new collection and input handle to subsequently control the collection.
21    ///
22    /// # Examples
23    ///
24    /// ```
25    /// use timely::Config;
26    /// use differential_dataflow::input::Input;
27    ///
28    /// ::timely::execute(Config::thread(), |worker| {
29    ///
30    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
31    ///         // create input handle and collection.
32    ///         let (handle, data) = scope.new_collection();
33    ///         let (probe, _) = data.map(|x| x * 2)
34    ///                         .inspect(|x| println!("{:?}", x))
35    ///                         .probe();
36    ///         (handle, probe)
37    ///     });
38    ///
39    ///     handle.insert(1);
40    ///     handle.insert(5);
41    ///
42    /// }).unwrap();
43    /// ```
44    fn new_collection<D, R>(&self) -> (InputSession<Self::Timestamp, D, R>, VecCollection<'scope, Self::Timestamp, D, R>)
45    where D: Data, R: Semigroup+'static;
46    /// Create a new collection and input handle from initial data.
47    ///
48    /// # Examples
49    ///
50    /// ```
51    /// use timely::Config;
52    /// use differential_dataflow::input::Input;
53    ///
54    /// ::timely::execute(Config::thread(), |worker| {
55    ///
56    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
57    ///         // create input handle and collection.
58    ///          let (handle, data) = scope.new_collection_from(0 .. 10);
59    ///          let (probe, _) = data.map(|x| x * 2)
60    ///                          .inspect(|x| println!("{:?}", x))
61    ///                          .probe();
62    ///          (handle, probe)
63    ///     });
64    ///
65    ///     handle.insert(1);
66    ///     handle.insert(5);
67    ///
68    /// }).unwrap();
69    /// ```
70    fn new_collection_from<I>(&self, data: I) -> (InputSession<Self::Timestamp, I::Item, isize>, VecCollection<'scope, Self::Timestamp, I::Item, isize>)
71    where I: IntoIterator<Item: Data> + 'static;
72    /// Create a new collection and input handle from initial data.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// use timely::Config;
78    /// use differential_dataflow::input::Input;
79    ///
80    /// ::timely::execute(Config::thread(), |worker| {
81    ///
82    ///     let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
83    ///         // create input handle and collection.
84    ///         let (handle, data) = scope.new_collection_from(0 .. 10);
85    ///         let (probe, _) = data.map(|x| x * 2)
86    ///                         .inspect(|x| println!("{:?}", x))
87    ///                         .probe();
88    ///         (handle, probe)
89    ///     });
90    ///
91    ///     handle.insert(1);
92    ///     handle.insert(5);
93    ///
94    /// }).unwrap();
95    /// ```
96    fn new_collection_from_raw<D, R, I>(&self, data: I) -> (InputSession<Self::Timestamp, D, R>, VecCollection<'scope, Self::Timestamp, D, R>)
97    where I: IntoIterator<Item=(D,Self::Timestamp,R)>+'static, D: Data, R: Semigroup+'static;
98}
99
100use crate::lattice::Lattice;
101impl<'scope, T: Timestamp + Lattice + timely::order::TotalOrder> Input<'scope> for Scope<'scope, T> {
102    fn new_collection<D, R>(&self) -> (InputSession<T, D, R>, VecCollection<'scope, T, D, R>)
103    where
104        D: Data, R: Semigroup+'static,
105    {
106        let (handle, stream) = self.new_input();
107        (InputSession::from(handle), stream.as_collection())
108    }
109    fn new_collection_from<I>(&self, data: I) -> (InputSession<T, I::Item, isize>, VecCollection<'scope, T, I::Item, isize>)
110    where I: IntoIterator+'static, I::Item: Data {
111        self.new_collection_from_raw(data.into_iter().map(|d| (d, <T as timely::progress::Timestamp>::minimum(), 1)))
112    }
113    fn new_collection_from_raw<D,R,I>(&self, data: I) -> (InputSession<T, D, R>, VecCollection<'scope, T, D, R>)
114    where
115        D: Data,
116        R: Semigroup+'static,
117        I: IntoIterator<Item=(D,T,R)>+'static,
118    {
119        use timely::dataflow::operators::ToStream;
120
121        let (handle, stream) = self.new_input();
122        let source = data.to_stream(*self).as_collection();
123
124        (InputSession::from(handle), stream.as_collection().concat(source))
125    }
126}
127
128/// An input session wrapping a single timely dataflow capability.
129///
130/// Each timely dataflow message has a corresponding capability, which is a logical time in the
131/// timely dataflow system. Differential dataflow updates can happen at a much higher rate than
132/// timely dataflow's progress tracking infrastructure supports, because the logical times are
133/// promoted to data and updates are batched together. The `InputSession` type does this batching.
134///
135/// # Examples
136///
137/// ```
138/// use timely::Config;
139/// use differential_dataflow::input::Input;
140///
141/// ::timely::execute(Config::thread(), |worker| {
142///
143///     let (mut handle, probe) = worker.dataflow(|scope| {
144///         // create input handle and collection.
145///         let (handle, data) = scope.new_collection_from(0 .. 10);
146///         let (probe, _) = data.map(|x| x * 2)
147///                         .inspect(|x| println!("{:?}", x))
148///                         .probe();
149///         (handle, probe)
150///     });
151///
152///     handle.insert(3);
153///     handle.advance_to(1);
154///     handle.insert(5);
155///     handle.advance_to(2);
156///     handle.flush();
157///
158///     while probe.less_than(handle.time()) {
159///         worker.step();
160///     }
161///
162///     handle.remove(5);
163///     handle.advance_to(3);
164///     handle.flush();
165///
166///     while probe.less_than(handle.time()) {
167///         worker.step();
168///     }
169///
170/// }).unwrap();
171/// ```
172pub struct InputSession<T: Timestamp+Clone, D: Data, R: Semigroup+'static> {
173    time: T,
174    buffer: Vec<(D, T, R)>,
175    handle: Handle<T,(D,T,R)>,
176}
177
178impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
179    /// Adds an element to the collection.
180    pub fn insert(&mut self, element: D) { self.update(element, 1); }
181    /// Removes an element from the collection.
182    pub fn remove(&mut self, element: D) { self.update(element,-1); }
183}
184
185// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i64> {
186//     /// Adds an element to the collection.
187//     pub fn insert(&mut self, element: D) { self.update(element, 1); }
188//     /// Removes an element from the collection.
189//     pub fn remove(&mut self, element: D) { self.update(element,-1); }
190// }
191
192// impl<T: Timestamp+Clone, D: Data> InputSession<T, D, i32> {
193//     /// Adds an element to the collection.
194//     pub fn insert(&mut self, element: D) { self.update(element, 1); }
195//     /// Removes an element from the collection.
196//     pub fn remove(&mut self, element: D) { self.update(element,-1); }
197// }
198
199impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> InputSession<T, D, R> {
200
201    /// Introduces a handle as collection.
202    pub fn to_collection<'scope>(&mut self, scope: Scope<'scope, T>) -> VecCollection<'scope, T, D, R>
203    where
204        T: timely::order::TotalOrder,
205    {
206        scope
207            .input_from(&mut self.handle)
208            .as_collection()
209    }
210
211    /// Allocates a new input handle.
212    pub fn new() -> Self {
213        let handle: Handle<T,_> = Handle::new();
214        InputSession {
215            time: handle.time().clone(),
216            buffer: Vec::new(),
217            handle,
218        }
219    }
220
221    /// Creates a new session from a reference to an input handle.
222    pub fn from(handle: Handle<T,(D,T,R)>) -> Self {
223        InputSession {
224            time: handle.time().clone(),
225            buffer: Vec::new(),
226            handle,
227        }
228    }
229
230    /// Adds to the weight of an element in the collection.
231    pub fn update(&mut self, element: D, change: R) {
232        if self.buffer.len() == self.buffer.capacity() {
233            if !self.buffer.is_empty() {
234                self.handle.send_batch(&mut self.buffer);
235            }
236            // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
237            self.buffer.reserve(1024);
238        }
239        self.buffer.push((element, self.time.clone(), change));
240    }
241
242    /// Adds to the weight of an element in the collection at a future time.
243    pub fn update_at(&mut self, element: D, time: T, change: R) {
244        assert!(self.time.less_equal(&time));
245        if self.buffer.len() == self.buffer.capacity() {
246            if !self.buffer.is_empty() {
247                self.handle.send_batch(&mut self.buffer);
248            }
249            // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
250            self.buffer.reserve(1024);
251        }
252        self.buffer.push((element, time, change));
253    }
254
255    /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session.
256    ///
257    /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is
258    /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is
259    /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.
260    pub fn flush(&mut self) {
261        self.handle.send_batch(&mut self.buffer);
262        if self.handle.time().less_than(&self.time) {
263            self.handle.advance_to(self.time.clone());
264        }
265    }
266
267    /// Advances the logical time for future records.
268    ///
269    /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when
270    /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while`
271    /// method unless the session has just been flushed.
272    pub fn advance_to(&mut self, time: T) {
273        assert!(self.handle.time().less_equal(&time));
274        assert!(&self.time.less_equal(&time));
275        self.time = time;
276    }
277
278    /// Reveals the current time of the session.
279    pub fn epoch(&self) -> &T { &self.time }
280    /// Reveals the current time of the session.
281    pub fn time(&self) -> &T { &self.time }
282
283    /// Closes the input, flushing and sealing the wrapped timely input.
284    pub fn close(self) { }
285}
286
287impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> Drop for InputSession<T, D, R> {
288    fn drop(&mut self) {
289        self.flush();
290    }
291}