Skip to main content

differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use timely::Container;
12use timely::progress::Timestamp;
13use timely::dataflow::{Scope, Stream};
14use timely::dataflow::operators::*;
15
16use crate::difference::Abelian;
17
18/// An evolving collection represented by a stream of abstract containers.
19///
20/// The containers purport to reperesent changes to a collection, and they must implement various traits
21/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
22/// on the containers, and streams of containers, are left to the container implementor to describe.
23#[derive(Clone)]
24pub struct Collection<'scope, T: Timestamp, C: 'static> {
25    /// The underlying timely dataflow stream.
26    ///
27    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
28    /// not intended to be the idiomatic way to work with the collection.
29    ///
30    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
31    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
32    /// unexpectedly.
33    pub inner: Stream<'scope, T, C>,
34}
35
36impl<'scope, T: Timestamp, C> Collection<'scope, T, C> {
37    /// Creates a new Collection from a timely dataflow stream.
38    ///
39    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
40    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
41    /// provides a `new_collection` method which will create a new collection for you without exposing
42    /// the underlying timely stream at all.
43    ///
44    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
45    /// method does not check it.
46    pub fn new(stream: Stream<'scope, T, C>) -> Self { Self { inner: stream } }
47}
48impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> {
49    /// Creates a new collection accumulating the contents of the two collections.
50    ///
51    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
52    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
53    /// two collections.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use differential_dataflow::input::Input;
59    ///
60    /// ::timely::example(|scope| {
61    ///
62    ///     let data = scope.new_collection_from(1 .. 10).1;
63    ///
64    ///     let odds = data.clone().filter(|x| x % 2 == 1);
65    ///     let evens = data.clone().filter(|x| x % 2 == 0);
66    ///
67    ///     odds.concat(evens)
68    ///         .assert_eq(data);
69    /// });
70    /// ```
71    pub fn concat(self, other: Self) -> Self {
72        self.inner
73            .concat(other.inner)
74            .as_collection()
75    }
76    /// Creates a new collection accumulating the contents of the two collections.
77    ///
78    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
79    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
80    /// two collections.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// use differential_dataflow::input::Input;
86    ///
87    /// ::timely::example(|scope| {
88    ///
89    ///     let data = scope.new_collection_from(1 .. 10).1;
90    ///
91    ///     let odds = data.clone().filter(|x| x % 2 == 1);
92    ///     let evens = data.clone().filter(|x| x % 2 == 0);
93    ///
94    ///     odds.concatenate(Some(evens))
95    ///         .assert_eq(data);
96    /// });
97    /// ```
98    pub fn concatenate<I>(self, sources: I) -> Self
99    where
100        I: IntoIterator<Item=Self>
101    {
102        self.inner
103            .scope()
104            .concatenate(sources.into_iter().map(|x| x.inner).chain([self.inner]))
105            .as_collection()
106    }
107    // Brings a Collection into a nested region.
108    ///
109    /// This method is a specialization of `enter` to the case where the nested scope is a region.
110    /// It removes the need for an operator that adjusts the timestamp.
111    pub fn enter_region<'inner>(self, child: Scope<'inner, T>) -> Collection<'inner, T, C> {
112        self.inner
113            .enter(child)
114            .as_collection()
115    }
116    /// Applies a supplied function to each batch of updates.
117    ///
118    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
119    /// timely dataflow capability associated with the batch of updates. The observed batching depends
120    /// on how the system executes, and may vary run to run.
121    ///
122    /// # Examples
123    ///
124    /// ```
125    /// use differential_dataflow::input::Input;
126    ///
127    /// ::timely::example(|scope| {
128    ///     scope.new_collection_from(1 .. 10).1
129    ///          .map_in_place(|x| *x *= 2)
130    ///          .filter(|x| x % 2 == 1)
131    ///          .inspect_container(|event| println!("event: {:?}", event));
132    /// });
133    /// ```
134    pub fn inspect_container<F>(self, func: F) -> Self
135    where
136        F: FnMut(Result<(&T, &C), &[T]>)+'static,
137    {
138        self.inner
139            .inspect_container(func)
140            .as_collection()
141    }
142    /// Attaches a timely dataflow probe to the output of a Collection.
143    ///
144    /// This probe is used to determine when the state of the Collection has stabilized and can
145    /// be read out.
146    pub fn probe(self) -> (probe::Handle<T>, Self) {
147        let (handle, stream) = self.inner.probe();
148        (handle, stream.as_collection())
149    }
150    /// Attaches a timely dataflow probe to the output of a Collection.
151    ///
152    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
153    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
154    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
155    /// avoid swamping the system.
156    pub fn probe_with(self, handle: &probe::Handle<T>) -> Self {
157        Self::new(self.inner.probe_with(handle))
158    }
159    /// The scope containing the underlying timely dataflow stream.
160    pub fn scope(&self) -> Scope<'scope, T> {
161        self.inner.scope()
162    }
163
164    /// Creates a new collection whose counts are the negation of those in the input.
165    ///
166    /// This method is most commonly used with `concat` to get those element in one collection but not another.
167    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
168    /// including negative counts.
169    ///
170    /// # Examples
171    ///
172    /// ```
173    /// use differential_dataflow::input::Input;
174    ///
175    /// ::timely::example(|scope| {
176    ///
177    ///     let data = scope.new_collection_from(1 .. 10).1;
178    ///
179    ///     let odds = data.clone().filter(|x| x % 2 == 1);
180    ///     let evens = data.clone().filter(|x| x % 2 == 0);
181    ///
182    ///     odds.negate()
183    ///         .concat(data)
184    ///         .assert_eq(evens);
185    /// });
186    /// ```
187    pub fn negate(self) -> Self where C: containers::Negate {
188        use timely::dataflow::channels::pact::Pipeline;
189        self.inner
190            .unary(Pipeline, "Negate", move |_,_| move |input, output| {
191                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
192            })
193            .as_collection()
194    }
195
196    /// Brings a Collection into a nested scope.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// use timely::dataflow::Scope;
202    /// use differential_dataflow::input::Input;
203    ///
204    /// ::timely::example(|scope| {
205    ///
206    ///     let data = scope.new_collection_from(1 .. 10).1;
207    ///
208    ///     let result = scope.region(|child| {
209    ///         data.clone()
210    ///             .enter(child)
211    ///             .leave(scope)
212    ///     });
213    ///
214    ///     data.assert_eq(result);
215    /// });
216    /// ```
217    pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Collection<'inner, TInner, <C as containers::Enter<T, TInner>>::InnerContainer>
218    where
219        C: containers::Enter<T, TInner, InnerContainer: Container>,
220        TInner: Refines<T>,
221    {
222        use timely::dataflow::channels::pact::Pipeline;
223        self.inner
224            .enter(child)
225            .unary(Pipeline, "Enter", move |_,_| move |input, output| {
226                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
227            })
228            .as_collection()
229    }
230
231    /// Advances a timestamp in the stream according to the timestamp actions on the path.
232    ///
233    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
234    /// incrementing fields would result in integer overflow. In this case, the record is dropped.
235    ///
236    /// # Examples
237    /// ```
238    /// use timely::dataflow::Scope;
239    /// use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::BranchWhen};
240    ///
241    /// use differential_dataflow::input::Input;
242    ///
243    /// timely::example(|scope| {
244    ///     let summary1 = 5;
245    ///
246    ///     let data = scope.new_collection_from(1 .. 10).1;
247    ///     /// Applies `results_in` on every timestamp in the collection.
248    ///     data.results_in(summary1);
249    /// });
250    /// ```
251    pub fn results_in(self, step: T::Summary) -> Self
252    where
253        C: containers::ResultsIn<T::Summary>,
254    {
255        use timely::dataflow::channels::pact::Pipeline;
256        self.inner
257            .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
258                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
259            })
260            .as_collection()
261    }
262}
263
264use timely::progress::timestamp::Refines;
265
266/// Methods requiring a nested scope.
267impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C>
268{
269    /// Returns the final value of a Collection from a nested scope to its containing scope.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use timely::dataflow::Scope;
275    /// use differential_dataflow::input::Input;
276    ///
277    /// ::timely::example(|scope| {
278    ///
279    ///    let data = scope.new_collection_from(1 .. 10).1;
280    ///
281    ///    let result = scope.region(|child| {
282    ///         data.clone()
283    ///             .enter(child)
284    ///             .leave(scope)
285    ///     });
286    ///
287    ///     data.assert_eq(result);
288    /// });
289    /// ```
290    pub fn leave<'outer, TOuter>(self, outer: Scope<'outer, TOuter>) -> Collection<'outer, TOuter, <C as containers::Leave<T, TOuter>>::OuterContainer>
291    where
292        TOuter: Timestamp,
293        T: Refines<TOuter>,
294        C: containers::Leave<T, TOuter, OuterContainer: Container>,
295    {
296        use timely::dataflow::channels::pact::Pipeline;
297        self.inner
298            .leave(outer)
299            .unary(Pipeline, "Leave", move |_,_| move |input, output| {
300                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
301            })
302            .as_collection()
303    }
304
305    /// Returns the value of a Collection from a nested region to its containing scope.
306    ///
307    /// This method is a specialization of `leave` to the case that of a nested region.
308    /// It removes the need for an operator that adjusts the timestamp.
309    pub fn leave_region<'outer>(self, outer: Scope<'outer, T>) -> Collection<'outer, T, C> {
310        self.inner
311            .leave(outer)
312            .as_collection()
313    }
314}
315
316pub use vec::Collection as VecCollection;
317/// Specializations of `Collection` that use `Vec` as the container.
318pub mod vec {
319
320    use std::hash::Hash;
321
322    use timely::progress::Timestamp;
323    use timely::order::Product;
324    use timely::dataflow::scope::Iterative;
325    use timely::dataflow::operators::*;
326    use timely::dataflow::operators::vec::*;
327
328    use crate::collection::AsCollection;
329    use crate::difference::{Semigroup, Abelian, Multiply};
330    use crate::lattice::Lattice;
331    use crate::hashable::Hashable;
332
333    /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
334    ///
335    /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
336    /// differential dataflow computation, you write as if the collection is a static dataset to which you
337    /// apply functional transformations, creating new collections. Once your computation is written, you
338    /// are able to mutate the collection (by inserting and removing elements); differential dataflow will
339    /// propagate changes through your functional computation and report the corresponding changes to the
340    /// output collections.
341    ///
342    /// Each vec collection has three generic parameters. The parameter `T` is the timestamp type of the
343    /// scope in which the collection exists; as you write more complicated programs you may wish to
344    /// introduce nested scopes (e.g. for iteration), and this parameter tracks the scope's timestamp
345    /// (for timely dataflow's benefit). The `D` parameter is the type of data in your collection, for
346    /// example `String`, or `(u32, Vec<Option<()>>)`.
347    /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
348    /// defaults to) `isize`, representing changes to the occurrence count of each record.
349    ///
350    /// This type definition instantiates the [`Collection`] type with a `Vec<(D, T, R)>`.
351    pub type Collection<'scope, T, D, R = isize> = super::Collection<'scope, T, Vec<(D, T, R)>>;
352
353
354    impl<'scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<'scope, T, D, R> {
355        /// Creates a new collection by applying the supplied function to each input element.
356        ///
357        /// # Examples
358        ///
359        /// ```
360        /// use differential_dataflow::input::Input;
361        ///
362        /// ::timely::example(|scope| {
363        ///     scope.new_collection_from(1 .. 10).1
364        ///          .map(|x| x * 2)
365        ///          .filter(|x| x % 2 == 1)
366        ///          .assert_empty();
367        /// });
368        /// ```
369        pub fn map<D2, L>(self, mut logic: L) -> Collection<'scope, T, D2, R>
370        where
371            D2: Clone+'static,
372            L: FnMut(D) -> D2 + 'static,
373        {
374            self.inner
375                .map(move |(data, time, delta)| (logic(data), time, delta))
376                .as_collection()
377        }
378        /// Creates a new collection by applying the supplied function to each input element.
379        ///
380        /// Although the name suggests in-place mutation, this function does not change the source collection,
381        /// but rather re-uses the underlying allocations in its implementation. The method is semantically
382        /// equivalent to `map`, but can be more efficient.
383        ///
384        /// # Examples
385        ///
386        /// ```
387        /// use differential_dataflow::input::Input;
388        ///
389        /// ::timely::example(|scope| {
390        ///     scope.new_collection_from(1 .. 10).1
391        ///          .map_in_place(|x| *x *= 2)
392        ///          .filter(|x| x % 2 == 1)
393        ///          .assert_empty();
394        /// });
395        /// ```
396        pub fn map_in_place<L>(self, mut logic: L) -> Collection<'scope, T, D, R>
397        where
398            L: FnMut(&mut D) + 'static,
399        {
400            self.inner
401                .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
402                .as_collection()
403        }
404        /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
405        ///
406        /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
407        /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
408        /// attempting to consolidate the results.
409        ///
410        /// # Examples
411        ///
412        /// ```
413        /// use differential_dataflow::input::Input;
414        ///
415        /// ::timely::example(|scope| {
416        ///     scope.new_collection_from(1 .. 10).1
417        ///          .flat_map(|x| 0 .. x);
418        /// });
419        /// ```
420        pub fn flat_map<I, L>(self, mut logic: L) -> Collection<'scope, T, I::Item, R>
421        where
422            T: Clone,
423            I: IntoIterator<Item: Clone+'static>,
424            L: FnMut(D) -> I + 'static,
425        {
426            self.inner
427                .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
428                .as_collection()
429        }
430        /// Creates a new collection containing those input records satisfying the supplied predicate.
431        ///
432        /// # Examples
433        ///
434        /// ```
435        /// use differential_dataflow::input::Input;
436        ///
437        /// ::timely::example(|scope| {
438        ///     scope.new_collection_from(1 .. 10).1
439        ///          .map(|x| x * 2)
440        ///          .filter(|x| x % 2 == 1)
441        ///          .assert_empty();
442        /// });
443        /// ```
444        pub fn filter<L>(self, mut logic: L) -> Collection<'scope, T, D, R>
445        where
446            L: FnMut(&D) -> bool + 'static,
447        {
448            self.inner
449                .filter(move |(data, _, _)| logic(data))
450                .as_collection()
451        }
452        /// Replaces each record with another, with a new difference type.
453        ///
454        /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
455        /// and move the data into the difference component. This will allow differential dataflow to update in-place.
456        ///
457        /// # Examples
458        ///
459        /// ```
460        /// use differential_dataflow::input::Input;
461        ///
462        /// ::timely::example(|scope| {
463        ///
464        ///     let nums = scope.new_collection_from(0 .. 10).1;
465        ///     let x1 = nums.clone().flat_map(|x| 0 .. x);
466        ///     let x2 = nums.map(|x| (x, 9 - x))
467        ///                  .explode(|(x,y)| Some((x,y)));
468        ///
469        ///     x1.assert_eq(x2);
470        /// });
471        /// ```
472        pub fn explode<D2, R2, I, L>(self, mut logic: L) -> Collection<'scope, T, D2, <R2 as Multiply<R>>::Output>
473        where
474            D2: Clone+'static,
475            R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
476            I: IntoIterator<Item=(D2,R2)>,
477            L: FnMut(D)->I+'static,
478        {
479            self.inner
480                .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
481                .as_collection()
482        }
483
484        /// Joins each record against a collection defined by the function `logic`.
485        ///
486        /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
487        /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
488        /// modifications made to the results, namely joining timestamps and multiplying differences.
489        ///
490        /// #Examples
491        ///
492        /// ```
493        /// use differential_dataflow::input::Input;
494        ///
495        /// ::timely::example(|scope| {
496        ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
497        ///     // for x from 0 through 9.
498        ///     scope.new_collection_from(0 .. 10isize).1
499        ///          .join_function(|x|
500        ///              //   data      time      diff
501        ///              vec![(2*x, (3*x) as u64,  x),
502        ///                   (2*x, (4*x) as u64, -x)]
503        ///           );
504        /// });
505        /// ```
506        pub fn join_function<D2, R2, I, L>(self, mut logic: L) -> Collection<'scope, T, D2, <R2 as Multiply<R>>::Output>
507        where
508            T: Lattice,
509            D2: Clone+'static,
510            R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
511            I: IntoIterator<Item=(D2,T,R2)>,
512            L: FnMut(D)->I+'static,
513        {
514            self.inner
515                .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
516                .as_collection()
517        }
518
519        /// Brings a Collection into a nested scope, at varying times.
520        ///
521        /// The `initial` function indicates the time at which each element of the Collection should appear.
522        ///
523        /// # Examples
524        ///
525        /// ```
526        /// use timely::dataflow::Scope;
527        /// use differential_dataflow::input::Input;
528        ///
529        /// ::timely::example(|scope| {
530        ///
531        ///     let data = scope.new_collection_from(1 .. 10).1;
532        ///
533        ///     let result = scope.iterative::<u64,_,_>(|child| {
534        ///         data.clone()
535        ///             .enter_at(child, |x| *x)
536        ///             .leave(scope)
537        ///     });
538        ///
539        ///     data.assert_eq(result);
540        /// });
541        /// ```
542        pub fn enter_at<'inner, TInner, F>(self, child: Iterative<'inner, T, TInner>, mut initial: F) -> Collection<'inner, Product<T, TInner>, D, R>
543        where
544            TInner: Timestamp+Hash,
545            F: FnMut(&D) -> TInner + Clone + 'static,
546        {
547            self.inner
548                .enter(child)
549                .map(move |(data, time, diff)| {
550                    let new_time = Product::new(time, initial(&data));
551                    (data, new_time, diff)
552                })
553                .as_collection()
554        }
555
556        /// Delays each difference by a supplied function.
557        ///
558        /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
559        /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
560        /// ordered, they should have the same order or compare equal once `func` is applied to them (this
561        /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
562        /// to all of the data timestamps).
563        pub fn delay<F>(self, func: F) -> Collection<'scope, T, D, R>
564        where
565            T: Hash,
566            F: FnMut(&T) -> T + Clone + 'static,
567        {
568            let mut func1 = func.clone();
569            let mut func2 = func.clone();
570
571            self.inner
572                .delay_batch(move |x| func1(x))
573                .map_in_place(move |x| x.1 = func2(&x.1))
574                .as_collection()
575        }
576
577        /// Applies a supplied function to each update.
578        ///
579        /// This method is most commonly used to report information back to the user, often for debugging purposes.
580        /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
581        /// not guarantee that it will be called as many times as you might expect.
582        ///
583        /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
584        /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
585        /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
586        /// interesting and less intuitive, unfortunately.
587        ///
588        /// # Examples
589        ///
590        /// ```
591        /// use differential_dataflow::input::Input;
592        ///
593        /// ::timely::example(|scope| {
594        ///     scope.new_collection_from(1 .. 10).1
595        ///          .map_in_place(|x| *x *= 2)
596        ///          .filter(|x| x % 2 == 1)
597        ///          .inspect(|x| println!("error: {:?}", x));
598        /// });
599        /// ```
600        pub fn inspect<F>(self, func: F) -> Collection<'scope, T, D, R>
601        where
602            F: FnMut(&(D, T, R))+'static,
603        {
604            self.inner
605                .inspect(func)
606                .as_collection()
607        }
608        /// Applies a supplied function to each batch of updates.
609        ///
610        /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
611        /// timely dataflow capability associated with the batch of updates. The observed batching depends
612        /// on how the system executes, and may vary run to run.
613        ///
614        /// # Examples
615        ///
616        /// ```
617        /// use differential_dataflow::input::Input;
618        ///
619        /// ::timely::example(|scope| {
620        ///     scope.new_collection_from(1 .. 10).1
621        ///          .map_in_place(|x| *x *= 2)
622        ///          .filter(|x| x % 2 == 1)
623        ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
624        /// });
625        /// ```
626        pub fn inspect_batch<F>(self, mut func: F) -> Collection<'scope, T, D, R>
627        where
628            F: FnMut(&T, &[(D, T, R)])+'static,
629        {
630            self.inner
631                .inspect_batch(move |time, data| func(time, data))
632                .as_collection()
633        }
634
635        /// Assert if the collection is ever non-empty.
636        ///
637        /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
638        /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
639        /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
640        /// program should indicate that this assertion never found cause to complain.
641        ///
642        /// # Examples
643        ///
644        /// ```
645        /// use differential_dataflow::input::Input;
646        ///
647        /// ::timely::example(|scope| {
648        ///     scope.new_collection_from(1 .. 10).1
649        ///          .map(|x| x * 2)
650        ///          .filter(|x| x % 2 == 1)
651        ///          .assert_empty();
652        /// });
653        /// ```
654        pub fn assert_empty(self)
655        where
656            D: crate::ExchangeData+Hashable,
657            R: crate::ExchangeData+Hashable + Semigroup,
658            T: Lattice+Ord,
659        {
660            self.consolidate()
661                .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
662        }
663    }
664
665    /// Methods requiring an Abelian difference, to support negation.
666    impl<'scope, T: Timestamp + Clone + 'static, D: Clone+'static, R: Abelian+'static> Collection<'scope, T, D, R> {
667        /// Assert if the collections are ever different.
668        ///
669        /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
670        /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
671        /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
672        /// indicate that this assertion never found cause to complain.
673        ///
674        /// # Examples
675        ///
676        /// ```
677        /// use differential_dataflow::input::Input;
678        ///
679        /// ::timely::example(|scope| {
680        ///
681        ///     let data = scope.new_collection_from(1 .. 10).1;
682        ///
683        ///     let odds = data.clone().filter(|x| x % 2 == 1);
684        ///     let evens = data.clone().filter(|x| x % 2 == 0);
685        ///
686        ///     odds.concat(evens)
687        ///         .assert_eq(data);
688        /// });
689        /// ```
690        pub fn assert_eq(self, other: Self)
691        where
692            D: crate::ExchangeData+Hashable,
693            R: crate::ExchangeData+Hashable,
694            T: Lattice+Ord,
695        {
696            self.negate()
697                .concat(other)
698                .assert_empty();
699        }
700    }
701
702    use crate::trace::{Trace, Builder};
703    use crate::operators::arrange::{Arranged, TraceAgent};
704
705    impl <'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
706    where
707        T: Timestamp + Lattice + Ord,
708        K: crate::ExchangeData+Hashable,
709        V: crate::ExchangeData,
710        R: crate::ExchangeData+Semigroup,
711    {
712        /// Applies a reduction function on records grouped by key.
713        ///
714        /// Input data must be structured as `(key, val)` pairs.
715        /// The user-supplied reduction function takes as arguments
716        ///
717        /// 1. a reference to the key,
718        /// 2. a reference to the slice of values and their accumulated updates,
719        /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
720        ///
721        /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
722        /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
723        /// `Ord` implementations.
724        ///
725        /// # Examples
726        ///
727        /// ```
728        /// use differential_dataflow::input::Input;
729        ///
730        /// ::timely::example(|scope| {
731        ///     // report the smallest value for each group
732        ///     scope.new_collection_from(1 .. 10).1
733        ///          .map(|x| (x / 3, x))
734        ///          .reduce(|_key, input, output| {
735        ///              output.push((*input[0].0, 1))
736        ///          });
737        /// });
738        /// ```
739        pub fn reduce<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, logic: L) -> Collection<'scope, T, (K, V2), R2>
740        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
741            self.reduce_named("Reduce", logic)
742        }
743
744        /// As `reduce` with the ability to name the operator.
745        pub fn reduce_named<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, name: &str, logic: L) -> Collection<'scope, T, (K, V2), R2>
746        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
747            use crate::trace::implementations::{ValBuilder, ValSpine};
748
749            self.arrange_by_key_named(&format!("Arrange: {}", name))
750                .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>,_>(
751                    name,
752                    logic,
753                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
754                )
755                .as_collection(|k,v| (k.clone(), v.clone()))
756        }
757
758        /// Applies `reduce` to arranged data, and returns an arrangement of output data.
759        ///
760        /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
761        /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
762        ///
763        /// # Examples
764        ///
765        /// ```
766        /// use differential_dataflow::input::Input;
767        /// use differential_dataflow::trace::Trace;
768        /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
769        ///
770        /// ::timely::example(|scope| {
771        ///
772        ///     let trace =
773        ///     scope.new_collection_from(1 .. 10u32).1
774        ///          .map(|x| (x, x))
775        ///          .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
776        ///             "Example",
777        ///              move |_key, src, dst| dst.push((*src[0].0, 1))
778        ///          )
779        ///          .trace;
780        /// });
781        /// ```
782        pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent<T2>>
783        where
784            T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static,
785            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
786            L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
787        {
788            self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
789                if !input.is_empty() { logic(key, input, change); }
790                change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
791                crate::consolidation::consolidate(change);
792            })
793        }
794
795        /// Solves for output updates when presented with inputs and would-be outputs.
796        ///
797        /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
798        /// and it may not be safe to index into the first element.
799        /// At least one of the two collections will be non-empty.
800        pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
801        where
802            V: Clone+'static,
803            T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=T>+'static,
804            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
805            L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
806        {
807            self.arrange_by_key_named(&format!("Arrange: {}", name))
808                .reduce_core::<_,Bu,_,_>(
809                    name,
810                    logic,
811                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
812                )
813        }
814    }
815
816    impl<'scope, T, K, R1> Collection<'scope, T, K, R1>
817    where
818        T: Timestamp + Lattice + Ord,
819        K: crate::ExchangeData+Hashable,
820        R1: crate::ExchangeData+Semigroup
821    {
822
823        /// Reduces the collection to one occurrence of each distinct element.
824        ///
825        /// # Examples
826        ///
827        /// ```
828        /// use differential_dataflow::input::Input;
829        ///
830        /// ::timely::example(|scope| {
831        ///     // report at most one of each key.
832        ///     scope.new_collection_from(1 .. 10).1
833        ///          .map(|x| x / 3)
834        ///          .distinct();
835        /// });
836        /// ```
837        pub fn distinct(self) -> Collection<'scope, T, K, isize> {
838            self.distinct_core()
839        }
840
841        /// Distinct for general integer differences.
842        ///
843        /// This method allows `distinct` to produce collections whose difference
844        /// type is something other than an `isize` integer, for example perhaps an
845        /// `i32`.
846        pub fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(self) -> Collection<'scope, T, K, R2> {
847            self.threshold_named("Distinct", |_,_| R2::from(1i8))
848        }
849
850        /// Transforms the multiplicity of records.
851        ///
852        /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
853        /// least the computation may behave as if it does. Otherwise, the transformation
854        /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
855        ///
856        /// # Examples
857        ///
858        /// ```
859        /// use differential_dataflow::input::Input;
860        ///
861        /// ::timely::example(|scope| {
862        ///     // report at most one of each key.
863        ///     scope.new_collection_from(1 .. 10).1
864        ///          .map(|x| x / 3)
865        ///          .threshold(|_,c| c % 2);
866        /// });
867        /// ```
868        pub fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(self, thresh: F) -> Collection<'scope, T, K, R2> {
869            self.threshold_named("Threshold", thresh)
870        }
871
872        /// A `threshold` with the ability to name the operator.
873        pub fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(self, name: &str, mut thresh: F) -> Collection<'scope, T, K, R2> {
874            use crate::trace::implementations::{KeyBuilder, KeySpine};
875
876            self.arrange_by_self_named(&format!("Arrange: {}", name))
877                .reduce_abelian::<_,KeyBuilder<K,T,R2>,KeySpine<K,T,R2>,_>(
878                    name,
879                    move |k,s,t| t.push(((), thresh(k, &s[0].1))),
880                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
881                )
882                .as_collection(|k,_| k.clone())
883        }
884
885    }
886
887    impl<'scope, T, K, R> Collection<'scope, T, K, R>
888    where
889        T: Timestamp + Lattice + Ord,
890        K: crate::ExchangeData+Hashable,
891        R: crate::ExchangeData+Semigroup
892    {
893
894        /// Counts the number of occurrences of each element.
895        ///
896        /// # Examples
897        ///
898        /// ```
899        /// use differential_dataflow::input::Input;
900        ///
901        /// ::timely::example(|scope| {
902        ///     // report the number of occurrences of each key
903        ///     scope.new_collection_from(1 .. 10).1
904        ///          .map(|x| x / 3)
905        ///          .count();
906        /// });
907        /// ```
908        pub fn count(self) -> Collection<'scope, T, (K, R), isize> { self.count_core() }
909
910        /// Count for general integer differences.
911        ///
912        /// This method allows `count` to produce collections whose difference
913        /// type is something other than an `isize` integer, for example perhaps an
914        /// `i32`.
915        pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<'scope, T, (K, R), R2> {
916            use crate::trace::implementations::{ValBuilder, ValSpine};
917            self.arrange_by_self_named("Arrange: Count")
918                .reduce_abelian::<_,ValBuilder<K,R,T,R2>,ValSpine<K,R,T,R2>,_>(
919                    "Count",
920                    |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))),
921                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
922                )
923                .as_collection(|k,c| (k.clone(), c.clone()))
924        }
925    }
926
927    /// Methods which require data be arrangeable.
928    impl<'scope, T, D, R> Collection<'scope, T, D, R>
929    where
930        T: Timestamp + Clone + 'static + Lattice,
931        D: crate::ExchangeData+Hashable,
932        R: crate::ExchangeData+Semigroup,
933    {
934        /// Aggregates the weights of equal records into at most one record.
935        ///
936        /// This method uses the type `D`'s `hashed()` method to partition the data. The data are
937        /// accumulated in place, each held back until their timestamp has completed.
938        ///
939        /// # Examples
940        ///
941        /// ```
942        /// use differential_dataflow::input::Input;
943        ///
944        /// ::timely::example(|scope| {
945        ///
946        ///     let x = scope.new_collection_from(1 .. 10u32).1;
947        ///
948        ///     x.clone()
949        ///      .negate()
950        ///      .concat(x)
951        ///      .consolidate() // <-- ensures cancellation occurs
952        ///      .assert_empty();
953        /// });
954        /// ```
955        pub fn consolidate(self) -> Self {
956            use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
957            self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
958        }
959
960        /// As `consolidate` but with the ability to name the operator, specify the trace type,
961        /// and provide the function `reify` to produce owned keys and values..
962        pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
963        where
964            Ba: crate::trace::Batcher<Input=Vec<((D,()),T,R)>, Time=T> + 'static,
965            Tr: for<'a> crate::trace::Trace<Time=T,Diff=R>+'static,
966            Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
967            F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
968        {
969            use crate::operators::arrange::arrangement::Arrange;
970            self.map(|k| (k, ()))
971                .arrange_named::<Ba, Bu, Tr>(name)
972                .as_collection(reify)
973        }
974
975        /// Aggregates the weights of equal records.
976        ///
977        /// Unlike `consolidate`, this method does not exchange data and does not
978        /// ensure that at most one copy of each `(data, time)` pair exists in the
979        /// results. Instead, it acts on each batch of data and collapses equivalent
980        /// `(data, time)` pairs found therein, suppressing any that accumulate to
981        /// zero.
982        ///
983        /// # Examples
984        ///
985        /// ```
986        /// use differential_dataflow::input::Input;
987        ///
988        /// ::timely::example(|scope| {
989        ///
990        ///     let x = scope.new_collection_from(1 .. 10u32).1;
991        ///
992        ///     // nothing to assert, as no particular guarantees.
993        ///     x.clone()
994        ///      .negate()
995        ///      .concat(x)
996        ///      .consolidate_stream();
997        /// });
998        /// ```
999        pub fn consolidate_stream(self) -> Self {
1000
1001            use timely::dataflow::channels::pact::Pipeline;
1002            use timely::dataflow::operators::Operator;
1003            use crate::collection::AsCollection;
1004            use crate::consolidation::ConsolidatingContainerBuilder;
1005
1006            self.inner
1007                .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {
1008
1009                    move |input, output| {
1010                        input.for_each(|time, data| {
1011                            output.session_with_builder(&time).give_iterator(data.drain(..));
1012                        })
1013                    }
1014                })
1015                .as_collection()
1016        }
1017    }
1018
1019    use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
1020    use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
1021    use crate::operators::arrange::Arrange;
1022
1023    impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R>
1024    where
1025        T: Timestamp + Lattice,
1026        K: crate::ExchangeData + Hashable,
1027        V: crate::ExchangeData,
1028        R: crate::ExchangeData + Semigroup,
1029    {
1030        fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
1031        where
1032            Ba: crate::trace::Batcher<Input=Vec<((K, V), T, R)>, Time=T> + 'static,
1033            Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
1034            Tr: crate::trace::Trace<Time=T> + 'static,
1035        {
1036            let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
1037            crate::operators::arrange::arrangement::arrange_core::<_, Ba, Bu, _>(self.inner, exchange, name)
1038        }
1039    }
1040
1041    impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R>
1042    where
1043        T: Timestamp + Lattice + Ord,
1044    {
1045        fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
1046        where
1047            Ba: crate::trace::Batcher<Input=Vec<((K,()),T,R)>, Time=T> + 'static,
1048            Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
1049            Tr: crate::trace::Trace<Time=T> + 'static,
1050        {
1051            let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
1052            crate::operators::arrange::arrangement::arrange_core::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
1053        }
1054    }
1055
1056
1057    impl<'scope, T, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<'scope, T, (K,V), R>
1058    where
1059        T: Timestamp + Lattice + Ord,
1060    {
1061        /// Arranges a collection of `(Key, Val)` records by `Key`.
1062        ///
1063        /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
1064        /// This trace is current for all times completed by the output stream, which can be used to
1065        /// safely identify the stable times and values in the trace.
1066        pub fn arrange_by_key(self) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
1067            self.arrange_by_key_named("ArrangeByKey")
1068        }
1069
1070        /// As `arrange_by_key` but with the ability to name the arrangement.
1071        pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
1072            self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
1073        }
1074    }
1075
1076    impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R>
1077    where
1078        T: Timestamp + Lattice + Ord,
1079    {
1080        /// Arranges a collection of `Key` records by `Key`.
1081        ///
1082        /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
1083        /// This trace is current for all times complete in the output stream, which can be used to safely
1084        /// identify the stable times and values in the trace.
1085        pub fn arrange_by_self(self) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
1086            self.arrange_by_self_named("ArrangeBySelf")
1087        }
1088
1089        /// As `arrange_by_self` but with the ability to name the arrangement.
1090        pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
1091            self.map(|k| (k, ()))
1092                .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
1093        }
1094    }
1095
1096    impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
1097    where
1098        T: Timestamp + Lattice + Ord,
1099        K: crate::ExchangeData+Hashable,
1100        V: crate::ExchangeData,
1101        R: crate::ExchangeData+Semigroup,
1102    {
1103        /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
1104        ///
1105        /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
1106        ///
1107        /// # Examples
1108        ///
1109        /// ```
1110        /// use differential_dataflow::input::Input;
1111        ///
1112        /// ::timely::example(|scope| {
1113        ///
1114        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1115        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1116        ///     let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
1117        ///
1118        ///     x.join(y)
1119        ///      .assert_eq(z);
1120        /// });
1121        /// ```
1122        pub fn join<V2, R2>(self, other: Collection<'scope, T, (K,V2), R2>) -> Collection<'scope, T, (K,(V,V2)), <R as Multiply<R2>>::Output>
1123        where
1124            K:  crate::ExchangeData,
1125            V2: crate::ExchangeData,
1126            R2: crate::ExchangeData+Semigroup,
1127            R: Multiply<R2, Output: Semigroup+'static>,
1128        {
1129            self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
1130        }
1131
1132        /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
1133        ///
1134        /// # Examples
1135        ///
1136        /// ```
1137        /// use differential_dataflow::input::Input;
1138        ///
1139        /// ::timely::example(|scope| {
1140        ///
1141        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1142        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1143        ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1144        ///
1145        ///     x.join_map(y, |_key, &a, &b| (a,b))
1146        ///      .assert_eq(z);
1147        /// });
1148        /// ```
1149        pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(self, other: Collection<'scope, T, (K, V2), R2>, mut logic: L) -> Collection<'scope, T, D, <R as Multiply<R2>>::Output>
1150        where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
1151            let arranged1 = self.arrange_by_key();
1152            let arranged2 = other.arrange_by_key();
1153            arranged1.join_core(arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
1154        }
1155
1156        /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
1157        ///
1158        /// When the second collection contains frequencies that are either zero or one this is the more traditional
1159        /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
1160        /// the counts of the records in the first input.
1161        ///
1162        /// # Examples
1163        ///
1164        /// ```
1165        /// use differential_dataflow::input::Input;
1166        ///
1167        /// ::timely::example(|scope| {
1168        ///
1169        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1170        ///     let y = scope.new_collection_from(vec![0, 2]).1;
1171        ///     let z = scope.new_collection_from(vec![(0, 1)]).1;
1172        ///
1173        ///     x.semijoin(y)
1174        ///      .assert_eq(z);
1175        /// });
1176        /// ```
1177        pub fn semijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), <R as Multiply<R2>>::Output>
1178        where R: Multiply<R2, Output: Semigroup+'static> {
1179            let arranged1 = self.arrange_by_key();
1180            let arranged2 = other.arrange_by_self();
1181            arranged1.join_core(arranged2, |k,v,_| Some((k.clone(), v.clone())))
1182        }
1183
1184        /// Subtracts the semijoin with `other` from `self`.
1185        ///
1186        /// In the case that `other` has multiplicities zero or one this results
1187        /// in a relational antijoin, in which we discard input records whose key
1188        /// is present in `other`. If the multiplicities could be other than zero
1189        /// or one, the semantic interpretation of this operator is less clear.
1190        ///
1191        /// In almost all cases, you should ensure that `other` has multiplicities
1192        /// that are zero or one, perhaps by using the `distinct` operator.
1193        ///
1194        /// # Examples
1195        ///
1196        /// ```
1197        /// use differential_dataflow::input::Input;
1198        ///
1199        /// ::timely::example(|scope| {
1200        ///
1201        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1202        ///     let y = scope.new_collection_from(vec![0, 2]).1;
1203        ///     let z = scope.new_collection_from(vec![(1, 3)]).1;
1204        ///
1205        ///     x.antijoin(y)
1206        ///      .assert_eq(z);
1207        /// });
1208        /// ```
1209        pub fn antijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), R>
1210        where R: Multiply<R2, Output=R>, R: Abelian+'static {
1211            self.clone().concat(self.semijoin(other).negate())
1212        }
1213
1214        /// Joins two arranged collections with the same key type.
1215        ///
1216        /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
1217        /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
1218        /// every value returned by the iterator.
1219        ///
1220        /// # Examples
1221        ///
1222        /// ```
1223        /// use differential_dataflow::input::Input;
1224        /// use differential_dataflow::trace::Trace;
1225        ///
1226        /// ::timely::example(|scope| {
1227        ///
1228        ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
1229        ///                  .arrange_by_key();
1230        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
1231        ///                  .arrange_by_key();
1232        ///
1233        ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1234        ///
1235        ///     x.join_core(y, |_key, &a, &b| Some((a, b)))
1236        ///      .assert_eq(z);
1237        /// });
1238        /// ```
1239        pub fn join_core<Tr2,I,L> (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,<R as Multiply<Tr2::Diff>>::Output>
1240        where
1241            Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=T>+Clone+'static,
1242            R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
1243            I: IntoIterator<Item: crate::Data>,
1244            L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
1245        {
1246            self.arrange_by_key()
1247                .join_core(stream2, result)
1248        }
1249    }
1250}
1251
1252/// Conversion to a differential dataflow Collection.
1253pub trait AsCollection<'scope, T: Timestamp, C> {
1254    /// Converts the type to a differential dataflow collection.
1255    fn as_collection(self) -> Collection<'scope, T, C>;
1256}
1257
1258impl<'scope, T: Timestamp, C> AsCollection<'scope, T, C> for Stream<'scope, T, C> {
1259    /// Converts the type to a differential dataflow collection.
1260    ///
1261    /// By calling this method, you guarantee that the timestamp invariant (as documented on
1262    /// [Collection]) is upheld. This method will not check it.
1263    fn as_collection(self) -> Collection<'scope, T, C> {
1264        Collection::<T,C>::new(self)
1265    }
1266}
1267
1268/// Concatenates multiple collections.
1269///
1270/// This method has the effect of a sequence of calls to `concat`, but it does
1271/// so in one operator rather than a chain of many operators.
1272///
1273/// # Examples
1274///
1275/// ```
1276/// use differential_dataflow::input::Input;
1277///
1278/// ::timely::example(|scope| {
1279///
1280///     let data = scope.new_collection_from(1 .. 10).1;
1281///
1282///     let odds = data.clone().filter(|x| x % 2 == 1);
1283///     let evens = data.clone().filter(|x| x % 2 == 0);
1284///
1285///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
1286///         .assert_eq(data);
1287/// });
1288/// ```
1289pub fn concatenate<'scope, T, C, I>(scope: Scope<'scope, T>, iterator: I) -> Collection<'scope, T, C>
1290where
1291    T: Timestamp,
1292    C: Container,
1293    I: IntoIterator<Item=Collection<'scope, T, C>>,
1294{
1295    scope
1296        .concatenate(iterator.into_iter().map(|x| x.inner))
1297        .as_collection()
1298}
1299
1300/// Traits that can be implemented by containers to provide functionality to collections based on them.
1301pub mod containers {
1302
1303    /// A container that can negate its updates.
1304    pub trait Negate {
1305        /// Negates Abelian differences of each update.
1306        fn negate(self) -> Self;
1307    }
1308
1309    /// A container that can enter from timestamp `T1` into timestamp `T2`.
1310    pub trait Enter<T1, T2> {
1311        /// The resulting container type.
1312        type InnerContainer;
1313        /// Update timestamps from `T1` to `T2`.
1314        fn enter(self) -> Self::InnerContainer;
1315    }
1316
1317    /// A container that can leave from timestamp `T1` into timestamp `T2`.
1318    pub trait Leave<T1, T2> {
1319        /// The resulting container type.
1320        type OuterContainer;
1321        /// Update timestamps from `T1` to `T2`.
1322        fn leave(self) -> Self::OuterContainer;
1323    }
1324
1325    /// A container that can advance timestamps by a summary `TS`.
1326    pub trait ResultsIn<TS> {
1327        /// Advance times in the container by `step`.
1328        fn results_in(self, step: &TS) -> Self;
1329    }
1330
1331
1332    /// Implementations of container traits for the `Vec` container.
1333    mod vec {
1334
1335        use timely::progress::{Timestamp, timestamp::Refines};
1336        use crate::collection::Abelian;
1337
1338        use super::{Negate, Enter, Leave, ResultsIn};
1339
1340        impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
1341            fn negate(mut self) -> Self {
1342                for (_data, _time, diff) in self.iter_mut() { diff.negate(); }
1343                self
1344            }
1345        }
1346
1347        impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
1348            type InnerContainer = Vec<(D, T2, R)>;
1349            fn enter(self) -> Self::InnerContainer {
1350                self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
1351            }
1352        }
1353
1354        impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
1355            type OuterContainer = Vec<(D, T2, R)>;
1356            fn leave(self) -> Self::OuterContainer {
1357                self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
1358            }
1359        }
1360
1361        impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
1362            fn results_in(self, step: &T::Summary) -> Self {
1363                use timely::progress::PathSummary;
1364                self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
1365            }
1366        }
1367    }
1368
1369    /// Implementations of container traits for the `Rc` container.
1370    mod rc {
1371        use std::rc::Rc;
1372
1373        use timely::progress::{Timestamp, timestamp::Refines};
1374
1375        use super::{Negate, Enter, Leave, ResultsIn};
1376
1377        impl<C: Negate+Clone+Default> Negate for Rc<C> {
1378            fn negate(mut self) -> Self {
1379                std::mem::take(Rc::make_mut(&mut self)).negate().into()
1380            }
1381        }
1382
1383        impl<C: Enter<T1, T2>+Clone+Default, T1: Timestamp, T2: Refines<T1>> Enter<T1, T2> for Rc<C> {
1384            type InnerContainer = Rc<C::InnerContainer>;
1385            fn enter(mut self) -> Self::InnerContainer {
1386                std::mem::take(Rc::make_mut(&mut self)).enter().into()
1387            }
1388        }
1389
1390        impl<C: Leave<T1, T2>+Clone+Default, T1: Refines<T2>, T2: Timestamp> Leave<T1, T2> for Rc<C> {
1391            type OuterContainer = Rc<C::OuterContainer>;
1392            fn leave(mut self) -> Self::OuterContainer {
1393                std::mem::take(Rc::make_mut(&mut self)).leave().into()
1394            }
1395        }
1396
1397        impl<C: ResultsIn<TS>+Clone+Default, TS> ResultsIn<TS> for Rc<C> {
1398            fn results_in(mut self, step: &TS) -> Self {
1399                std::mem::take(Rc::make_mut(&mut self)).results_in(step).into()
1400            }
1401        }
1402    }
1403}