differential_dataflow/input.rs
1//! Input sessions for simplified collection updates.
2//!
3//! Although users can directly manipulate timely dataflow streams as collection inputs,
4//! the `InputSession` type can make this more efficient and less error-prone. Specifically,
5//! the type batches up updates with their logical times and ships them with coarsened
6//! timely dataflow capabilities, exposing more concurrency to the operator implementations
7//! than are evident from the logical times, which appear to execute in sequence.
8
9use timely::progress::Timestamp;
10use timely::dataflow::operators::vec::Input as TimelyInput;
11use timely::dataflow::operators::vec::input::Handle;
12use timely::dataflow::Scope;
13
14use crate::Data;
15use crate::difference::Semigroup;
16use crate::collection::{VecCollection, AsCollection};
17
18/// Create a new collection and input handle to control the collection.
19pub trait Input<'scope> : TimelyInput<'scope> {
20 /// Create a new collection and input handle to subsequently control the collection.
21 ///
22 /// # Examples
23 ///
24 /// ```
25 /// use timely::Config;
26 /// use differential_dataflow::input::Input;
27 ///
28 /// ::timely::execute(Config::thread(), |worker| {
29 ///
30 /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
31 /// // create input handle and collection.
32 /// let (handle, data) = scope.new_collection();
33 /// let (probe, _) = data.map(|x| x * 2)
34 /// .inspect(|x| println!("{:?}", x))
35 /// .probe();
36 /// (handle, probe)
37 /// });
38 ///
39 /// handle.insert(1);
40 /// handle.insert(5);
41 ///
42 /// }).unwrap();
43 /// ```
44 fn new_collection<D, R>(&self) -> (InputSession<Self::Timestamp, D, R>, VecCollection<'scope, Self::Timestamp, D, R>)
45 where D: Data, R: Semigroup+'static;
46 /// Create a new collection and input handle from initial data.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// use timely::Config;
52 /// use differential_dataflow::input::Input;
53 ///
54 /// ::timely::execute(Config::thread(), |worker| {
55 ///
56 /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
57 /// // create input handle and collection.
58 /// let (handle, data) = scope.new_collection_from(0 .. 10);
59 /// let (probe, _) = data.map(|x| x * 2)
60 /// .inspect(|x| println!("{:?}", x))
61 /// .probe();
62 /// (handle, probe)
63 /// });
64 ///
65 /// handle.insert(1);
66 /// handle.insert(5);
67 ///
68 /// }).unwrap();
69 /// ```
70 fn new_collection_from<I>(&self, data: I) -> (InputSession<Self::Timestamp, I::Item, isize>, VecCollection<'scope, Self::Timestamp, I::Item, isize>)
71 where I: IntoIterator<Item: Data> + 'static;
72 /// Create a new collection and input handle from initial data.
73 ///
74 /// # Examples
75 ///
76 /// ```
77 /// use timely::Config;
78 /// use differential_dataflow::input::Input;
79 ///
80 /// ::timely::execute(Config::thread(), |worker| {
81 ///
82 /// let (mut handle, probe) = worker.dataflow::<(),_,_>(|scope| {
83 /// // create input handle and collection.
84 /// let (handle, data) = scope.new_collection_from(0 .. 10);
85 /// let (probe, _) = data.map(|x| x * 2)
86 /// .inspect(|x| println!("{:?}", x))
87 /// .probe();
88 /// (handle, probe)
89 /// });
90 ///
91 /// handle.insert(1);
92 /// handle.insert(5);
93 ///
94 /// }).unwrap();
95 /// ```
96 fn new_collection_from_raw<D, R, I>(&self, data: I) -> (InputSession<Self::Timestamp, D, R>, VecCollection<'scope, Self::Timestamp, D, R>)
97 where I: IntoIterator<Item=(D,Self::Timestamp,R)>+'static, D: Data, R: Semigroup+'static;
98}
99
100use crate::lattice::Lattice;
101impl<'scope, T: Timestamp + Lattice + timely::order::TotalOrder> Input<'scope> for Scope<'scope, T> {
102 fn new_collection<D, R>(&self) -> (InputSession<T, D, R>, VecCollection<'scope, T, D, R>)
103 where
104 D: Data, R: Semigroup+'static,
105 {
106 let (handle, stream) = self.new_input();
107 (InputSession::from(handle), stream.as_collection())
108 }
109 fn new_collection_from<I>(&self, data: I) -> (InputSession<T, I::Item, isize>, VecCollection<'scope, T, I::Item, isize>)
110 where I: IntoIterator+'static, I::Item: Data {
111 self.new_collection_from_raw(data.into_iter().map(|d| (d, <T as timely::progress::Timestamp>::minimum(), 1)))
112 }
113 fn new_collection_from_raw<D,R,I>(&self, data: I) -> (InputSession<T, D, R>, VecCollection<'scope, T, D, R>)
114 where
115 D: Data,
116 R: Semigroup+'static,
117 I: IntoIterator<Item=(D,T,R)>+'static,
118 {
119 use timely::dataflow::operators::ToStream;
120
121 let (handle, stream) = self.new_input();
122 let source = data.to_stream(*self).as_collection();
123
124 (InputSession::from(handle), stream.as_collection().concat(source))
125 }
126}
127
128/// An input session wrapping a single timely dataflow capability.
129///
130/// Each timely dataflow message has a corresponding capability, which is a logical time in the
131/// timely dataflow system. Differential dataflow updates can happen at a much higher rate than
132/// timely dataflow's progress tracking infrastructure supports, because the logical times are
133/// promoted to data and updates are batched together. The `InputSession` type does this batching.
134///
135/// # Examples
136///
137/// ```
138/// use timely::Config;
139/// use differential_dataflow::input::Input;
140///
141/// ::timely::execute(Config::thread(), |worker| {
142///
143/// let (mut handle, probe) = worker.dataflow(|scope| {
144/// // create input handle and collection.
145/// let (handle, data) = scope.new_collection_from(0 .. 10);
146/// let (probe, _) = data.map(|x| x * 2)
147/// .inspect(|x| println!("{:?}", x))
148/// .probe();
149/// (handle, probe)
150/// });
151///
152/// handle.insert(3);
153/// handle.advance_to(1);
154/// handle.insert(5);
155/// handle.advance_to(2);
156/// handle.flush();
157///
158/// while probe.less_than(handle.time()) {
159/// worker.step();
160/// }
161///
162/// handle.remove(5);
163/// handle.advance_to(3);
164/// handle.flush();
165///
166/// while probe.less_than(handle.time()) {
167/// worker.step();
168/// }
169///
170/// }).unwrap();
171/// ```
172pub struct InputSession<T: Timestamp+Clone, D: Data, R: Semigroup+'static> {
173 time: T,
174 buffer: Vec<(D, T, R)>,
175 handle: Handle<T,(D,T,R)>,
176}
177
178impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
179 /// Adds an element to the collection.
180 pub fn insert(&mut self, element: D) { self.update(element, 1); }
181 /// Removes an element from the collection.
182 pub fn remove(&mut self, element: D) { self.update(element,-1); }
183}
184
185impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> InputSession<T, D, R> {
186
187 /// Introduces a handle as collection.
188 pub fn to_collection<'scope>(&mut self, scope: Scope<'scope, T>) -> VecCollection<'scope, T, D, R>
189 where
190 T: timely::order::TotalOrder,
191 {
192 scope
193 .input_from(&mut self.handle)
194 .as_collection()
195 }
196
197 /// Allocates a new input handle.
198 pub fn new() -> Self {
199 let handle: Handle<T,_> = Handle::new();
200 InputSession {
201 time: handle.time().clone(),
202 buffer: Vec::new(),
203 handle,
204 }
205 }
206
207 /// Creates a new session from a reference to an input handle.
208 pub fn from(handle: Handle<T,(D,T,R)>) -> Self {
209 InputSession {
210 time: handle.time().clone(),
211 buffer: Vec::new(),
212 handle,
213 }
214 }
215
216 /// Adds to the weight of an element in the collection.
217 pub fn update(&mut self, element: D, change: R) {
218 if self.buffer.len() == self.buffer.capacity() {
219 if !self.buffer.is_empty() {
220 self.handle.send_batch(&mut self.buffer);
221 }
222 // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
223 self.buffer.reserve(1024);
224 }
225 self.buffer.push((element, self.time.clone(), change));
226 }
227
228 /// Adds to the weight of an element in the collection at a future time.
229 pub fn update_at(&mut self, element: D, time: T, change: R) {
230 assert!(self.time.less_equal(&time));
231 if self.buffer.len() == self.buffer.capacity() {
232 if !self.buffer.is_empty() {
233 self.handle.send_batch(&mut self.buffer);
234 }
235 // TODO : This is a fairly arbitrary choice; should probably use `Context::default_size()` or such.
236 self.buffer.reserve(1024);
237 }
238 self.buffer.push((element, time, change));
239 }
240
241 /// Forces buffered data into the timely dataflow input, and advances its time to match that of the session.
242 ///
243 /// It is important to call `flush` before expecting timely dataflow to report progress. Until this method is
244 /// called, all updates may still be in internal buffers and not exposed to timely dataflow. Once the method is
245 /// called, all buffers are flushed and timely dataflow is advised that some logical times are no longer possible.
246 pub fn flush(&mut self) {
247 self.handle.send_batch(&mut self.buffer);
248 if self.handle.time().less_than(&self.time) {
249 self.handle.advance_to(self.time.clone());
250 }
251 }
252
253 /// Advances the logical time for future records.
254 ///
255 /// Importantly, this method does **not** immediately inform timely dataflow of the change. This happens only when
256 /// the session is dropped or flushed. It is not correct to use this time as a basis for a computation's `step_while`
257 /// method unless the session has just been flushed.
258 pub fn advance_to(&mut self, time: T) {
259 assert!(self.handle.time().less_equal(&time));
260 assert!(&self.time.less_equal(&time));
261 self.time = time;
262 }
263
264 /// Reveals the current time of the session.
265 pub fn epoch(&self) -> &T { &self.time }
266 /// Reveals the current time of the session.
267 pub fn time(&self) -> &T { &self.time }
268
269 /// Closes the input, flushing and sealing the wrapped timely input.
270 pub fn close(self) { }
271}
272
273impl<T: Timestamp+Clone, D: Data, R: Semigroup+'static> Drop for InputSession<T, D, R> {
274 fn drop(&mut self) {
275 self.flush();
276 }
277}