differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::Data;
14use timely::progress::Timestamp;
15use timely::order::Product;
16use timely::dataflow::scopes::{Child, child::Iterative};
17use timely::dataflow::{Scope, Stream};
18use timely::dataflow::operators::*;
19
20use crate::difference::{Semigroup, Abelian, Multiply};
21use crate::lattice::Lattice;
22use crate::hashable::Hashable;
23
24/// A mutable collection of values of type `D`
25///
26/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
27/// differential dataflow computation, you write as if the collection is a static dataset to which you
28/// apply functional transformations, creating new collections. Once your computation is written, you
29/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
30/// propagate changes through your functional computation and report the corresponding changes to the
31/// output collections.
32///
33/// Each collection has three generic parameters. The parameter `G` is for the scope in which the
34/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
35/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
36/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
37/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
38/// defaults to) `isize`, representing changes to the occurrence count of each record.
39#[derive(Clone)]
40pub struct Collection<G: Scope, D, R: Semigroup = isize> {
41    /// The underlying timely dataflow stream.
42    ///
43    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
44    /// not intended to be the idiomatic way to work with the collection.
45    pub inner: Stream<G, (D, G::Timestamp, R)>
46}
47
48impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
49    /// Creates a new Collection from a timely dataflow stream.
50    ///
51    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
52    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
53    /// provides a `new_collection` method which will create a new collection for you without exposing
54    /// the underlying timely stream at all.
55    pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
56        Collection { inner: stream }
57    }
58    /// Creates a new collection by applying the supplied function to each input element.
59    ///
60    /// # Examples
61    ///
62    /// ```
63    /// use differential_dataflow::input::Input;
64    ///
65    /// ::timely::example(|scope| {
66    ///     scope.new_collection_from(1 .. 10).1
67    ///          .map(|x| x * 2)
68    ///          .filter(|x| x % 2 == 1)
69    ///          .assert_empty();
70    /// });
71    /// ```
72    pub fn map<D2, L>(&self, mut logic: L) -> Collection<G, D2, R>
73    where D2: Data,
74          L: FnMut(D) -> D2 + 'static
75    {
76        self.inner
77            .map(move |(data, time, delta)| (logic(data), time, delta))
78            .as_collection()
79    }
80    /// Creates a new collection by applying the supplied function to each input element.
81    ///
82    /// Although the name suggests in-place mutation, this function does not change the source collection,
83    /// but rather re-uses the underlying allocations in its implementation. The method is semantically
84    /// equivalent to `map`, but can be more efficient.
85    ///
86    /// # Examples
87    ///
88    /// ```
89    /// use differential_dataflow::input::Input;
90    ///
91    /// ::timely::example(|scope| {
92    ///     scope.new_collection_from(1 .. 10).1
93    ///          .map_in_place(|x| *x *= 2)
94    ///          .filter(|x| x % 2 == 1)
95    ///          .assert_empty();
96    /// });
97    /// ```
98    pub fn map_in_place<L>(&self, mut logic: L) -> Collection<G, D, R>
99    where L: FnMut(&mut D) + 'static {
100        self.inner
101            .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
102            .as_collection()
103    }
104    /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
105    ///
106    /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
107    /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
108    /// attempting to consolidate the results.
109    ///
110    /// # Examples
111    ///
112    /// ```
113    /// use differential_dataflow::input::Input;
114    ///
115    /// ::timely::example(|scope| {
116    ///     scope.new_collection_from(1 .. 10).1
117    ///          .flat_map(|x| 0 .. x);
118    /// });
119    /// ```
120    pub fn flat_map<I, L>(&self, mut logic: L) -> Collection<G, I::Item, R>
121        where G::Timestamp: Clone,
122              I: IntoIterator,
123              I::Item: Data,
124              L: FnMut(D) -> I + 'static {
125        self.inner
126            .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
127            .as_collection()
128    }
129    /// Creates a new collection containing those input records satisfying the supplied predicate.
130    ///
131    /// # Examples
132    ///
133    /// ```
134    /// use differential_dataflow::input::Input;
135    ///
136    /// ::timely::example(|scope| {
137    ///     scope.new_collection_from(1 .. 10).1
138    ///          .map(|x| x * 2)
139    ///          .filter(|x| x % 2 == 1)
140    ///          .assert_empty();
141    /// });
142    /// ```
143    pub fn filter<L>(&self, mut logic: L) -> Collection<G, D, R>
144    where L: FnMut(&D) -> bool + 'static {
145        self.inner
146            .filter(move |(data, _, _)| logic(data))
147            .as_collection()
148    }
149    /// Creates a new collection accumulating the contents of the two collections.
150    ///
151    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
152    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
153    /// two collections.
154    ///
155    /// # Examples
156    ///
157    /// ```
158    /// use differential_dataflow::input::Input;
159    ///
160    /// ::timely::example(|scope| {
161    ///
162    ///     let data = scope.new_collection_from(1 .. 10).1;
163    ///
164    ///     let odds = data.filter(|x| x % 2 == 1);
165    ///     let evens = data.filter(|x| x % 2 == 0);
166    ///
167    ///     odds.concat(&evens)
168    ///         .assert_eq(&data);
169    /// });
170    /// ```
171    pub fn concat(&self, other: &Collection<G, D, R>) -> Collection<G, D, R> {
172        self.inner
173            .concat(&other.inner)
174            .as_collection()
175    }
176    /// Creates a new collection accumulating the contents of the two collections.
177    ///
178    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
179    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
180    /// two collections.
181    ///
182    /// # Examples
183    ///
184    /// ```
185    /// use differential_dataflow::input::Input;
186    ///
187    /// ::timely::example(|scope| {
188    ///
189    ///     let data = scope.new_collection_from(1 .. 10).1;
190    ///
191    ///     let odds = data.filter(|x| x % 2 == 1);
192    ///     let evens = data.filter(|x| x % 2 == 0);
193    ///
194    ///     odds.concatenate(Some(evens))
195    ///         .assert_eq(&data);
196    /// });
197    /// ```
198    pub fn concatenate<I>(&self, sources: I) -> Collection<G, D, R>
199    where
200        I: IntoIterator<Item=Collection<G, D, R>>
201    {
202        self.inner
203            .concatenate(sources.into_iter().map(|x| x.inner))
204            .as_collection()
205    }
206    /// Replaces each record with another, with a new difference type.
207    ///
208    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
209    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
210    ///
211    /// # Examples
212    ///
213    /// ```
214    /// use differential_dataflow::input::Input;
215    ///
216    /// ::timely::example(|scope| {
217    ///
218    ///     let nums = scope.new_collection_from(0 .. 10).1;
219    ///     let x1 = nums.flat_map(|x| 0 .. x);
220    ///     let x2 = nums.map(|x| (x, 9 - x))
221    ///                  .explode(|(x,y)| Some((x,y)));
222    ///
223    ///     x1.assert_eq(&x2);
224    /// });
225    /// ```
226    pub fn explode<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
227    where D2: Data,
228          R2: Semigroup+Multiply<R>,
229          <R2 as Multiply<R>>::Output: Data+Semigroup,
230          I: IntoIterator<Item=(D2,R2)>,
231          L: FnMut(D)->I+'static,
232    {
233        self.inner
234            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
235            .as_collection()
236    }
237
238    /// Joins each record against a collection defined by the function `logic`.
239    ///
240    /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
241    /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
242    /// modifications made to the results, namely joining timestamps and multiplying differences.
243    ///
244    /// #Examples
245    ///
246    /// ```
247    /// use differential_dataflow::input::Input;
248    ///
249    /// ::timely::example(|scope| {
250    ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
251    ///     // for x from 0 through 9.
252    ///     scope.new_collection_from(0 .. 10isize).1
253    ///          .join_function(|x|
254    ///              //   data      time      diff
255    ///              vec![(2*x, (3*x) as u64,  x),
256    ///                   (2*x, (4*x) as u64, -x)]
257    ///           );
258    /// });
259    /// ```
260    pub fn join_function<D2, R2, I, L>(&self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
261    where G::Timestamp: Lattice,
262          D2: Data,
263          R2: Semigroup+Multiply<R>,
264          <R2 as Multiply<R>>::Output: Data+Semigroup,
265          I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
266          L: FnMut(D)->I+'static,
267    {
268        self.inner
269            .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
270            .as_collection()
271    }
272
273    /// Brings a Collection into a nested scope.
274    ///
275    /// # Examples
276    ///
277    /// ```
278    /// use timely::dataflow::Scope;
279    /// use differential_dataflow::input::Input;
280    ///
281    /// ::timely::example(|scope| {
282    ///
283    ///     let data = scope.new_collection_from(1 .. 10).1;
284    ///
285    ///     let result = scope.region(|child| {
286    ///         data.enter(child)
287    ///             .leave()
288    ///     });
289    ///
290    ///     data.assert_eq(&result);
291    /// });
292    /// ```
293    pub fn enter<'a, T>(&self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, D, R>
294    where
295        T: Refines<<G as ScopeParent>::Timestamp>,
296    {
297        self.inner
298            .enter(child)
299            .map(|(data, time, diff)| (data, T::to_inner(time), diff))
300            .as_collection()
301    }
302
303    /// Brings a Collection into a nested scope, at varying times.
304    ///
305    /// The `initial` function indicates the time at which each element of the Collection should appear.
306    ///
307    /// # Examples
308    ///
309    /// ```
310    /// use timely::dataflow::Scope;
311    /// use differential_dataflow::input::Input;
312    ///
313    /// ::timely::example(|scope| {
314    ///
315    ///     let data = scope.new_collection_from(1 .. 10).1;
316    ///
317    ///     let result = scope.iterative::<u64,_,_>(|child| {
318    ///         data.enter_at(child, |x| *x)
319    ///             .leave()
320    ///     });
321    ///
322    ///     data.assert_eq(&result);
323    /// });
324    /// ```
325    pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection<Iterative<'a, G, T>, D, R>
326    where
327        T: Timestamp+Hash,
328        F: FnMut(&D) -> T + Clone + 'static,
329        G::Timestamp: Hash,
330    {
331
332        let mut initial1 = initial.clone();
333        let mut initial2 = initial.clone();
334
335        self.inner
336            .enter_at(child, move |x| initial1(&x.0))
337            .map(move |(data, time, diff)| {
338                let new_time = Product::new(time, initial2(&data));
339                (data, new_time, diff)
340            })
341            .as_collection()
342    }
343
344    /// Brings a Collection into a nested region.
345    ///
346    /// This method is a specialization of `enter` to the case where the nested scope is a region.
347    /// It removes the need for an operator that adjusts the timestamp.
348    pub fn enter_region<'a>(&self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, D, R>
349    {
350        self.inner
351            .enter(child)
352            .as_collection()
353    }
354
355    /// Delays each difference by a supplied function.
356    ///
357    /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
358    /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
359    /// ordered, they should have the same order once `func` is applied to them (this is because we advance the
360    /// timely capability with the same logic, and it must remain `less_equal` to all of the data timestamps).
361    pub fn delay<F>(&self, func: F) -> Collection<G, D, R>
362    where F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static {
363
364        let mut func1 = func.clone();
365        let mut func2 = func.clone();
366
367        self.inner
368            .delay_batch(move |x| func1(x))
369            .map_in_place(move |x| x.1 = func2(&x.1))
370            .as_collection()
371    }
372    /// Applies a supplied function to each update.
373    ///
374    /// This method is most commonly used to report information back to the user, often for debugging purposes.
375    /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
376    /// not guarantee that it will be called as many times as you might expect.
377    ///
378    /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
379    /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
380    /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
381    /// interesting and less intuitive, unfortunately.
382    ///
383    /// # Examples
384    ///
385    /// ```
386    /// use differential_dataflow::input::Input;
387    ///
388    /// ::timely::example(|scope| {
389    ///     scope.new_collection_from(1 .. 10).1
390    ///          .map_in_place(|x| *x *= 2)
391    ///          .filter(|x| x % 2 == 1)
392    ///          .inspect(|x| println!("error: {:?}", x));
393    /// });
394    /// ```
395    pub fn inspect<F>(&self, func: F) -> Collection<G, D, R>
396    where F: FnMut(&(D, G::Timestamp, R))+'static {
397        self.inner
398            .inspect(func)
399            .as_collection()
400    }
401    /// Applies a supplied function to each batch of updates.
402    ///
403    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
404    /// timely dataflow capability associated with the batch of updates. The observed batching depends
405    /// on how the system executes, and may vary run to run.
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// use differential_dataflow::input::Input;
411    ///
412    /// ::timely::example(|scope| {
413    ///     scope.new_collection_from(1 .. 10).1
414    ///          .map_in_place(|x| *x *= 2)
415    ///          .filter(|x| x % 2 == 1)
416    ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
417    /// });
418    /// ```
419    pub fn inspect_batch<F>(&self, func: F) -> Collection<G, D, R>
420    where F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static {
421        self.inner
422            .inspect_batch(func)
423            .as_collection()
424    }
425    /// Attaches a timely dataflow probe to the output of a Collection.
426    ///
427    /// This probe is used to determine when the state of the Collection has stabilized and can
428    /// be read out.
429    pub fn probe(&self) -> probe::Handle<G::Timestamp> {
430        self.inner
431            .probe()
432    }
433    /// Attaches a timely dataflow probe to the output of a Collection.
434    ///
435    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
436    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
437    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
438    /// avoid swamping the system.
439    pub fn probe_with(&self, handle: &mut probe::Handle<G::Timestamp>) -> Collection<G, D, R> {
440        self.inner
441            .probe_with(handle)
442            .as_collection()
443    }
444
445    /// Assert if the collection is ever non-empty.
446    ///
447    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
448    /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
449    /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
450    /// program should indicate that this assertion never found cause to complain.
451    ///
452    /// # Examples
453    ///
454    /// ```
455    /// use differential_dataflow::input::Input;
456    ///
457    /// ::timely::example(|scope| {
458    ///     scope.new_collection_from(1 .. 10).1
459    ///          .map(|x| x * 2)
460    ///          .filter(|x| x % 2 == 1)
461    ///          .assert_empty();
462    /// });
463    /// ```
464    pub fn assert_empty(&self)
465    where D: crate::ExchangeData+Hashable,
466          R: crate::ExchangeData+Hashable,
467          G::Timestamp: Lattice+Ord,
468    {
469        self.consolidate()
470            .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
471    }
472
473    /// The scope containing the underlying timely dataflow stream.
474    pub fn scope(&self) -> G {
475        self.inner.scope()
476    }
477}
478
479use timely::dataflow::scopes::ScopeParent;
480use timely::progress::timestamp::Refines;
481
482/// Methods requiring a nested scope.
483impl<'a, G: Scope, T: Timestamp, D: Data, R: Semigroup> Collection<Child<'a, G, T>, D, R>
484where
485    T: Refines<<G as ScopeParent>::Timestamp>,
486{
487    /// Returns the final value of a Collection from a nested scope to its containing scope.
488    ///
489    /// # Examples
490    ///
491    /// ```
492    /// use timely::dataflow::Scope;
493    /// use differential_dataflow::input::Input;
494    ///
495    /// ::timely::example(|scope| {
496    ///
497    ///    let data = scope.new_collection_from(1 .. 10).1;
498    ///
499    ///    let result = scope.region(|child| {
500    ///         data.enter(child)
501    ///             .leave()
502    ///     });
503    ///
504    ///     data.assert_eq(&result);
505    /// });
506    /// ```
507    pub fn leave(&self) -> Collection<G, D, R> {
508        self.inner
509            .leave()
510            .map(|(data, time, diff)| (data, time.to_outer(), diff))
511            .as_collection()
512    }
513}
514
515/// Methods requiring a region as the scope.
516impl<'a, G: Scope, D: Data, R: Semigroup> Collection<Child<'a, G, G::Timestamp>, D, R>
517{
518    /// Returns the value of a Collection from a nested region to its containing scope.
519    ///
520    /// This method is a specialization of `leave` to the case that of a nested region.
521    /// It removes the need for an operator that adjusts the timestamp.
522    pub fn leave_region(&self) -> Collection<G, D, R> {
523        self.inner
524            .leave()
525            .as_collection()
526    }
527}
528
529/// Methods requiring an Abelian difference, to support negation.
530impl<G: Scope, D: Data, R: Abelian> Collection<G, D, R> where G::Timestamp: Data {
531    /// Creates a new collection whose counts are the negation of those in the input.
532    ///
533    /// This method is most commonly used with `concat` to get those element in one collection but not another.
534    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
535    /// including negative counts.
536    ///
537    /// # Examples
538    ///
539    /// ```
540    /// use differential_dataflow::input::Input;
541    ///
542    /// ::timely::example(|scope| {
543    ///
544    ///     let data = scope.new_collection_from(1 .. 10).1;
545    ///
546    ///     let odds = data.filter(|x| x % 2 == 1);
547    ///     let evens = data.filter(|x| x % 2 == 0);
548    ///
549    ///     odds.negate()
550    ///         .concat(&data)
551    ///         .assert_eq(&evens);
552    /// });
553    /// ```
554    pub fn negate(&self) -> Collection<G, D, R> {
555        self.inner
556            .map_in_place(|x| x.2 = x.2.clone().negate())
557            .as_collection()
558    }
559
560
561    /// Assert if the collections are ever different.
562    ///
563    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
564    /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
565    /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
566    /// indicate that this assertion never found cause to complain.
567    ///
568    /// # Examples
569    ///
570    /// ```
571    /// use differential_dataflow::input::Input;
572    ///
573    /// ::timely::example(|scope| {
574    ///
575    ///     let data = scope.new_collection_from(1 .. 10).1;
576    ///
577    ///     let odds = data.filter(|x| x % 2 == 1);
578    ///     let evens = data.filter(|x| x % 2 == 0);
579    ///
580    ///     odds.concat(&evens)
581    ///         .assert_eq(&data);
582    /// });
583    /// ```
584    pub fn assert_eq(&self, other: &Self)
585    where D: crate::ExchangeData+Hashable,
586          R: crate::ExchangeData+Hashable,
587          G::Timestamp: Lattice+Ord
588    {
589        self.negate()
590            .concat(other)
591            .assert_empty();
592    }
593}
594
595/// Conversion to a differential dataflow Collection.
596pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
597    /// Converts the type to a differential dataflow collection.
598    fn as_collection(&self) -> Collection<G, D, R>;
599}
600
601impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
602    fn as_collection(&self) -> Collection<G, D, R> {
603        Collection::new(self.clone())
604    }
605}
606
607/// Concatenates multiple collections.
608///
609/// This method has the effect of a sequence of calls to `concat`, but it does
610/// so in one operator rather than a chain of many operators.
611///
612/// # Examples
613///
614/// ```
615/// use differential_dataflow::input::Input;
616///
617/// ::timely::example(|scope| {
618///
619///     let data = scope.new_collection_from(1 .. 10).1;
620///
621///     let odds = data.filter(|x| x % 2 == 1);
622///     let evens = data.filter(|x| x % 2 == 0);
623///
624///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
625///         .assert_eq(&data);
626/// });
627/// ```
628pub fn concatenate<G, D, R, I>(scope: &mut G, iterator: I) -> Collection<G, D, R>
629where
630    G: Scope,
631    D: Data,
632    R: Semigroup,
633    I: IntoIterator<Item=Collection<G, D, R>>,
634{
635    scope
636        .concatenate(iterator.into_iter().map(|x| x.inner))
637        .as_collection()
638}