differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::Data;
14use timely::progress::Timestamp;
15use timely::order::Product;
16use timely::dataflow::scopes::{Child, child::Iterative};
17use timely::dataflow::{Scope, Stream};
18use timely::dataflow::operators::*;
19
20use crate::difference::{Semigroup, Abelian, Multiply};
21use crate::lattice::Lattice;
22use crate::hashable::Hashable;
23
24/// A mutable collection of values of type `D`
25///
26/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
27/// differential dataflow computation, you write as if the collection is a static dataset to which you
28/// apply functional transformations, creating new collections. Once your computation is written, you
29/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
30/// propagate changes through your functional computation and report the corresponding changes to the
31/// output collections.
32///
33/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
34/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
35/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
36/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
37/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
38/// defaults to) `isize`, representing changes to the occurrence count of each record.
39#[derive(Clone)]
40pub struct Collection<G: Scope, D, R: Semigroup = isize> {
41 /// The underlying timely dataflow stream.
42 ///
43 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
44 /// not intended to be the idiomatic way to work with the collection.
45 pub inner: Stream<G, (D, G::Timestamp, R)>
46}
47
48impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
49 /// Creates a new Collection from a timely dataflow stream.
50 ///
51 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
52 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
53 /// provides a `new_collection` method which will create a new collection for you without exposing
54 /// the underlying timely stream at all.
55 pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
56 Collection { inner: stream }
57 }
58 /// Creates a new collection by applying the supplied function to each input element.
59 ///
60 /// # Examples
61 ///
62 /// ```
63 /// use differential_dataflow::input::Input;
64 ///
65 /// ::timely::example(|scope| {
66 /// scope.new_collection_from(1 .. 10).1
67 /// .map(|x| x * 2)
68 /// .filter(|x| x % 2 == 1)
69 /// .assert_empty();
70 /// });
71 /// ```
72 pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
73 where D2: Data,
74 L: FnMut(D) -> D2 + 'static
75 {
76 self.inner
77 .map(move |(data, time, delta)| (logic(data), time, delta))
78 .as_collection()
79 }
80 /// Creates a new collection by applying the supplied function to each input element.
81 ///
82 /// Although the name suggests in-place mutation, this function does not change the source collection,
83 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
84 /// equivalent to `map`, but can be more efficient.
85 ///
86 /// # Examples
87 ///
88 /// ```
89 /// use differential_dataflow::input::Input;
90 ///
91 /// ::timely::example(|scope| {
92 /// scope.new_collection_from(1 .. 10).1
93 /// .map_in_place(|x| *x *= 2)
94 /// .filter(|x| x % 2 == 1)
95 /// .assert_empty();
96 /// });
97 /// ```
98 pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
99 where L: FnMut(&mut D) + 'static {
100 self.inner
101 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
102 .as_collection()
103 }
104 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
105 ///
106 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
107 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
108 /// attempting to consolidate the results.
109 ///
110 /// # Examples
111 ///
112 /// ```
113 /// use differential_dataflow::input::Input;
114 ///
115 /// ::timely::example(|scope| {
116 /// scope.new_collection_from(1 .. 10).1
117 /// .flat_map(|x| 0 .. x);
118 /// });
119 /// ```
120 pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
121 where G::Timestamp: Clone,
122 I: IntoIterator,
123 I::Item: Data,
124 L: FnMut(D) -> I + 'static {
125 self.inner
126 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
127 .as_collection()
128 }
129 /// Creates a new collection containing those input records satisfying the supplied predicate.
130 ///
131 /// # Examples
132 ///
133 /// ```
134 /// use differential_dataflow::input::Input;
135 ///
136 /// ::timely::example(|scope| {
137 /// scope.new_collection_from(1 .. 10).1
138 /// .map(|x| x * 2)
139 /// .filter(|x| x % 2 == 1)
140 /// .assert_empty();
141 /// });
142 /// ```
143 pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
144 where L: FnMut(&D) -> bool + 'static {
145 self.inner
146 .filter(move |(data, _, _)| logic(data))
147 .as_collection()
148 }
149 /// Creates a new collection accumulating the contents of the two collections.
150 ///
151 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
152 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
153 /// two collections.
154 ///
155 /// # Examples
156 ///
157 /// ```
158 /// use differential_dataflow::input::Input;
159 ///
160 /// ::timely::example(|scope| {
161 ///
162 /// let data = scope.new_collection_from(1 .. 10).1;
163 ///
164 /// let odds = data.filter(|x| x % 2 == 1);
165 /// let evens = data.filter(|x| x % 2 == 0);
166 ///
167 /// odds.concat(&evens)
168 /// .assert_eq(&data);
169 /// });
170 /// ```
171 pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
172 self.inner
173 .concat(&other.inner)
174 .as_collection()
175 }
176 /// Creates a new collection accumulating the contents of the two collections.
177 ///
178 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
179 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
180 /// two collections.
181 ///
182 /// # Examples
183 ///
184 /// ```
185 /// use differential_dataflow::input::Input;
186 ///
187 /// ::timely::example(|scope| {
188 ///
189 /// let data = scope.new_collection_from(1 .. 10).1;
190 ///
191 /// let odds = data.filter(|x| x % 2 == 1);
192 /// let evens = data.filter(|x| x % 2 == 0);
193 ///
194 /// odds.concatenate(Some(evens))
195 /// .assert_eq(&data);
196 /// });
197 /// ```
198 pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
199 where
200 I: IntoIterator<Item=Collection<G, D, R>>
201 {
202 self.inner
203 .concatenate(sources.into_iter().map(|x| x.inner))
204 .as_collection()
205 }
206 /// Replaces each record with another, with a new difference type.
207 ///
208 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
209 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
210 ///
211 /// # Examples
212 ///
213 /// ```
214 /// use differential_dataflow::input::Input;
215 ///
216 /// ::timely::example(|scope| {
217 ///
218 /// let nums = scope.new_collection_from(0 .. 10).1;
219 /// let x1 = nums.flat_map(|x| 0 .. x);
220 /// let x2 = nums.map(|x| (x, 9 - x))
221 /// .explode(|(x,y)| Some((x,y)));
222 ///
223 /// x1.assert_eq(&x2);
224 /// });
225 /// ```
226 pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
227 where D2: Data,
228 R2: Semigroup+Multiply<R>,
229 <R2 as Multiply<R>>::Output: Data+Semigroup,
230 I: IntoIterator<Item=(D2,R2)>,
231 L: FnMut(D)->I+'static,
232 {
233 self.inner
234 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
235 .as_collection()
236 }
237
238 /// Joins each record against a collection defined by the function `logic`.
239 ///
240 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
241 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
242 /// modifications made to the results, namely joining timestamps and multiplying differences.
243 ///
244 /// #Examples
245 ///
246 /// ```
247 /// use differential_dataflow::input::Input;
248 ///
249 /// ::timely::example(|scope| {
250 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
251 /// // for x from 0 through 9.
252 /// scope.new_collection_from(0 .. 10isize).1
253 /// .join_function(|x|
254 /// // data time diff
255 /// vec![(2*x, (3*x) as u64, x),
256 /// (2*x, (4*x) as u64, -x)]
257 /// );
258 /// });
259 /// ```
260 pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
261 where G::Timestamp: Lattice,
262 D2: Data,
263 R2: Semigroup+Multiply<R>,
264 <R2 as Multiply<R>>::Output: Data+Semigroup,
265 I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
266 L: FnMut(D)->I+'static,
267 {
268 self.inner
269 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
270 .as_collection()
271 }
272
273 /// Brings a Collection into a nested scope.
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// use timely::dataflow::Scope;
279 /// use differential_dataflow::input::Input;
280 ///
281 /// ::timely::example(|scope| {
282 ///
283 /// let data = scope.new_collection_from(1 .. 10).1;
284 ///
285 /// let result = scope.region(|child| {
286 /// data.enter(child)
287 /// .leave()
288 /// });
289 ///
290 /// data.assert_eq(&result);
291 /// });
292 /// ```
293 pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
294 where
295 T: Refines<<G as ScopeParent>::Timestamp>,
296 {
297 self.inner
298 .enter(child)
299 .map(|(data, time, diff)| (data, T::to_inner(time), diff))
300 .as_collection()
301 }
302
303 /// Brings a Collection into a nested scope, at varying times.
304 ///
305 /// The `initial` function indicates the time at which each element of the Collection should appear.
306 ///
307 /// # Examples
308 ///
309 /// ```
310 /// use timely::dataflow::Scope;
311 /// use differential_dataflow::input::Input;
312 ///
313 /// ::timely::example(|scope| {
314 ///
315 /// let data = scope.new_collection_from(1 .. 10).1;
316 ///
317 /// let result = scope.iterative::<u64,_,_>(|child| {
318 /// data.enter_at(child, |x| *x)
319 /// .leave()
320 /// });
321 ///
322 /// data.assert_eq(&result);
323 /// });
324 /// ```
325 pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection<Iterative<'a, G, T>, D, R>
326 where
327 T: Timestamp+Hash,
328 F: FnMut(&D) -> T + Clone + 'static,
329 G::Timestamp: Hash,
330 {
331
332 let mut initial1 = initial.clone();
333 let mut initial2 = initial.clone();
334
335 self.inner
336 .enter_at(child, move |x| initial1(&x.0))
337 .map(move |(data, time, diff)| {
338 let new_time = Product::new(time, initial2(&data));
339 (data, new_time, diff)
340 })
341 .as_collection()
342 }
343
344 /// Brings a Collection into a nested region.
345 ///
346 /// This method is a specialization of `enter` to the case where the nested scope is a region.
347 /// It removes the need for an operator that adjusts the timestamp.
348 pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R>
349 {
350 self.inner
351 .enter(child)
352 .as_collection()
353 }
354
355 /// Delays each difference by a supplied function.
356 ///
357 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
358 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
359 /// ordered, they should have the same order once `func` is applied to them (this is because we advance the
360 /// timely capability with the same logic, and it must remain `less_equal` to all of the data timestamps).
361 pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
362 where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static {
363
364 let mut func1 = func.clone();
365 let mut func2 = func.clone();
366
367 self.inner
368 .delay_batch(move |x| func1(x))
369 .map_in_place(move |x| x.1 = func2(&x.1))
370 .as_collection()
371 }
372 /// Applies a supplied function to each update.
373 ///
374 /// This method is most commonly used to report information back to the user, often for debugging purposes.
375 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
376 /// not guarantee that it will be called as many times as you might expect.
377 ///
378 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
379 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
380 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
381 /// interesting and less intuitive, unfortunately.
382 ///
383 /// # Examples
384 ///
385 /// ```
386 /// use differential_dataflow::input::Input;
387 ///
388 /// ::timely::example(|scope| {
389 /// scope.new_collection_from(1 .. 10).1
390 /// .map_in_place(|x| *x *= 2)
391 /// .filter(|x| x % 2 == 1)
392 /// .inspect(|x| println!("error: {:?}", x));
393 /// });
394 /// ```
395 pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
396 where F: FnMut(&(D, G::Timestamp, R))+'static {
397 self.inner
398 .inspect(func)
399 .as_collection()
400 }
401 /// Applies a supplied function to each batch of updates.
402 ///
403 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
404 /// timely dataflow capability associated with the batch of updates. The observed batching depends
405 /// on how the system executes, and may vary run to run.
406 ///
407 /// # Examples
408 ///
409 /// ```
410 /// use differential_dataflow::input::Input;
411 ///
412 /// ::timely::example(|scope| {
413 /// scope.new_collection_from(1 .. 10).1
414 /// .map_in_place(|x| *x *= 2)
415 /// .filter(|x| x % 2 == 1)
416 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
417 /// });
418 /// ```
419 pub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
420 where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static {
421 self.inner
422 .inspect_batch(func)
423 .as_collection()
424 }
425 /// Attaches a timely dataflow probe to the output of a Collection.
426 ///
427 /// This probe is used to determine when the state of the Collection has stabilized and can
428 /// be read out.
429 pub fn probe(&self) -> probe::Handle<G::Timestamp> {
430 self.inner
431 .probe()
432 }
433 /// Attaches a timely dataflow probe to the output of a Collection.
434 ///
435 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
436 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
437 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
438 /// avoid swamping the system.
439 pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
440 self.inner
441 .probe_with(handle)
442 .as_collection()
443 }
444
445 /// Assert if the collection is ever non-empty.
446 ///
447 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
448 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
449 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
450 /// program should indicate that this assertion never found cause to complain.
451 ///
452 /// # Examples
453 ///
454 /// ```
455 /// use differential_dataflow::input::Input;
456 ///
457 /// ::timely::example(|scope| {
458 /// scope.new_collection_from(1 .. 10).1
459 /// .map(|x| x * 2)
460 /// .filter(|x| x % 2 == 1)
461 /// .assert_empty();
462 /// });
463 /// ```
464 pub fn assert_empty(&self)
465 where D: crate::ExchangeData+Hashable,
466 R: crate::ExchangeData+Hashable,
467 G::Timestamp: Lattice+Ord,
468 {
469 self.consolidate()
470 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
471 }
472
473 /// The scope containing the underlying timely dataflow stream.
474 pub fn scope(&self) -> G {
475 self.inner.scope()
476 }
477}
478
479use timely::dataflow::scopes::ScopeParent;
480use timely::progress::timestamp::Refines;
481
482/// Methods requiring a nested scope.
483impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection<Child<'a, G, T>, D, R>
484where
485 T: Refines<<G as ScopeParent>::Timestamp>,
486{
487 /// Returns the final value of a Collection from a nested scope to its containing scope.
488 ///
489 /// # Examples
490 ///
491 /// ```
492 /// use timely::dataflow::Scope;
493 /// use differential_dataflow::input::Input;
494 ///
495 /// ::timely::example(|scope| {
496 ///
497 /// let data = scope.new_collection_from(1 .. 10).1;
498 ///
499 /// let result = scope.region(|child| {
500 /// data.enter(child)
501 /// .leave()
502 /// });
503 ///
504 /// data.assert_eq(&result);
505 /// });
506 /// ```
507 pub fn leave(&self) -> Collection<G, D, R> {
508 self.inner
509 .leave()
510 .map(|(data, time, diff)| (data, time.to_outer(), diff))
511 .as_collection()
512 }
513}
514
515/// Methods requiring a region as the scope.
516impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>, D, R>
517{
518 /// Returns the value of a Collection from a nested region to its containing scope.
519 ///
520 /// This method is a specialization of `leave` to the case that of a nested region.
521 /// It removes the need for an operator that adjusts the timestamp.
522 pub fn leave_region(&self) -> Collection<G, D, R> {
523 self.inner
524 .leave()
525 .as_collection()
526 }
527}
528
529/// Methods requiring an Abelian difference, to support negation.
530impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data {
531 /// Creates a new collection whose counts are the negation of those in the input.
532 ///
533 /// This method is most commonly used with `concat` to get those element in one collection but not another.
534 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
535 /// including negative counts.
536 ///
537 /// # Examples
538 ///
539 /// ```
540 /// use differential_dataflow::input::Input;
541 ///
542 /// ::timely::example(|scope| {
543 ///
544 /// let data = scope.new_collection_from(1 .. 10).1;
545 ///
546 /// let odds = data.filter(|x| x % 2 == 1);
547 /// let evens = data.filter(|x| x % 2 == 0);
548 ///
549 /// odds.negate()
550 /// .concat(&data)
551 /// .assert_eq(&evens);
552 /// });
553 /// ```
554 pub fn negate(&self) -> Collection<G, D, R> {
555 self.inner
556 .map_in_place(|x| x.2 = x.2.clone().negate())
557 .as_collection()
558 }
559
560
561 /// Assert if the collections are ever different.
562 ///
563 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
564 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
565 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
566 /// indicate that this assertion never found cause to complain.
567 ///
568 /// # Examples
569 ///
570 /// ```
571 /// use differential_dataflow::input::Input;
572 ///
573 /// ::timely::example(|scope| {
574 ///
575 /// let data = scope.new_collection_from(1 .. 10).1;
576 ///
577 /// let odds = data.filter(|x| x % 2 == 1);
578 /// let evens = data.filter(|x| x % 2 == 0);
579 ///
580 /// odds.concat(&evens)
581 /// .assert_eq(&data);
582 /// });
583 /// ```
584 pub fn assert_eq(&self, other: &Self)
585 where D: crate::ExchangeData+Hashable,
586 R: crate::ExchangeData+Hashable,
587 G::Timestamp: Lattice+Ord
588 {
589 self.negate()
590 .concat(other)
591 .assert_empty();
592 }
593}
594
595/// Conversion to a differential dataflow Collection.
596pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
597 /// Converts the type to a differential dataflow collection.
598 fn as_collection(&self) -> Collection<G, D, R>;
599}
600
601impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
602 fn as_collection(&self) -> Collection<G, D, R> {
603 Collection::new(self.clone())
604 }
605}
606
607/// Concatenates multiple collections.
608///
609/// This method has the effect of a sequence of calls to `concat`, but it does
610/// so in one operator rather than a chain of many operators.
611///
612/// # Examples
613///
614/// ```
615/// use differential_dataflow::input::Input;
616///
617/// ::timely::example(|scope| {
618///
619/// let data = scope.new_collection_from(1 .. 10).1;
620///
621/// let odds = data.filter(|x| x % 2 == 1);
622/// let evens = data.filter(|x| x % 2 == 0);
623///
624/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
625/// .assert_eq(&data);
626/// });
627/// ```
628pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
629where
630 G: Scope,
631 D: Data,
632 R: Semigroup,
633 I: IntoIterator<Item=Collection<G, D, R>>,
634{
635 scope
636 .concatenate(iterator.into_iter().map(|x| x.inner))
637 .as_collection()
638}