Skip to main content

differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use timely::Container;
12use timely::progress::Timestamp;
13use timely::dataflow::{Scope, Stream};
14use timely::dataflow::operators::*;
15
16use crate::difference::Abelian;
17
18/// An evolving collection represented by a stream of abstract containers.
19///
20/// The containers purport to reperesent changes to a collection, and they must implement various traits
21/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
22/// on the containers, and streams of containers, are left to the container implementor to describe.
23#[derive(Clone)]
24pub struct Collection<'scope, T: Timestamp, C: 'static> {
25    /// The underlying timely dataflow stream.
26    ///
27    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
28    /// not intended to be the idiomatic way to work with the collection.
29    ///
30    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
31    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
32    /// unexpectedly.
33    pub inner: Stream<'scope, T, C>,
34}
35
36impl<'scope, T: Timestamp, C> Collection<'scope, T, C> {
37    /// Creates a new Collection from a timely dataflow stream.
38    ///
39    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
40    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
41    /// provides a `new_collection` method which will create a new collection for you without exposing
42    /// the underlying timely stream at all.
43    ///
44    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
45    /// method does not check it.
46    pub fn new(stream: Stream<'scope, T, C>) -> Self { Self { inner: stream } }
47}
48impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> {
49    /// Creates a new collection accumulating the contents of the two collections.
50    ///
51    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
52    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
53    /// two collections.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// use differential_dataflow::input::Input;
59    ///
60    /// ::timely::example(|scope| {
61    ///
62    ///     let data = scope.new_collection_from(1 .. 10).1;
63    ///
64    ///     let odds = data.clone().filter(|x| x % 2 == 1);
65    ///     let evens = data.clone().filter(|x| x % 2 == 0);
66    ///
67    ///     odds.concat(evens)
68    ///         .assert_eq(data);
69    /// });
70    /// ```
71    pub fn concat(self, other: Self) -> Self {
72        self.inner
73            .concat(other.inner)
74            .as_collection()
75    }
76    /// Creates a new collection accumulating the contents of the two collections.
77    ///
78    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
79    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
80    /// two collections.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// use differential_dataflow::input::Input;
86    ///
87    /// ::timely::example(|scope| {
88    ///
89    ///     let data = scope.new_collection_from(1 .. 10).1;
90    ///
91    ///     let odds = data.clone().filter(|x| x % 2 == 1);
92    ///     let evens = data.clone().filter(|x| x % 2 == 0);
93    ///
94    ///     odds.concatenate(Some(evens))
95    ///         .assert_eq(data);
96    /// });
97    /// ```
98    pub fn concatenate<I>(self, sources: I) -> Self
99    where
100        I: IntoIterator<Item=Self>
101    {
102        self.inner
103            .scope()
104            .concatenate(sources.into_iter().map(|x| x.inner).chain([self.inner]))
105            .as_collection()
106    }
107    // Brings a Collection into a nested region.
108    ///
109    /// This method is a specialization of `enter` to the case where the nested scope is a region.
110    /// It removes the need for an operator that adjusts the timestamp.
111    pub fn enter_region<'inner>(self, child: Scope<'inner, T>) -> Collection<'inner, T, C> {
112        self.inner
113            .enter(child)
114            .as_collection()
115    }
116    /// Applies a supplied function to each batch of updates.
117    ///
118    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
119    /// timely dataflow capability associated with the batch of updates. The observed batching depends
120    /// on how the system executes, and may vary run to run.
121    ///
122    /// # Examples
123    ///
124    /// ```
125    /// use differential_dataflow::input::Input;
126    ///
127    /// ::timely::example(|scope| {
128    ///     scope.new_collection_from(1 .. 10).1
129    ///          .map_in_place(|x| *x *= 2)
130    ///          .filter(|x| x % 2 == 1)
131    ///          .inspect_container(|event| println!("event: {:?}", event));
132    /// });
133    /// ```
134    pub fn inspect_container<F>(self, func: F) -> Self
135    where
136        F: FnMut(Result<(&T, &C), &[T]>)+'static,
137    {
138        self.inner
139            .inspect_container(func)
140            .as_collection()
141    }
142    /// Attaches a timely dataflow probe to the output of a Collection.
143    ///
144    /// This probe is used to determine when the state of the Collection has stabilized and can
145    /// be read out.
146    pub fn probe(self) -> (probe::Handle<T>, Self) {
147        let (handle, stream) = self.inner.probe();
148        (handle, stream.as_collection())
149    }
150    /// Attaches a timely dataflow probe to the output of a Collection.
151    ///
152    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
153    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
154    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
155    /// avoid swamping the system.
156    pub fn probe_with(self, handle: &probe::Handle<T>) -> Self {
157        Self::new(self.inner.probe_with(handle))
158    }
159    /// The scope containing the underlying timely dataflow stream.
160    pub fn scope(&self) -> Scope<'scope, T> {
161        self.inner.scope()
162    }
163
164    /// Creates a new collection whose counts are the negation of those in the input.
165    ///
166    /// This method is most commonly used with `concat` to get those element in one collection but not another.
167    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
168    /// including negative counts.
169    ///
170    /// # Examples
171    ///
172    /// ```
173    /// use differential_dataflow::input::Input;
174    ///
175    /// ::timely::example(|scope| {
176    ///
177    ///     let data = scope.new_collection_from(1 .. 10).1;
178    ///
179    ///     let odds = data.clone().filter(|x| x % 2 == 1);
180    ///     let evens = data.clone().filter(|x| x % 2 == 0);
181    ///
182    ///     odds.negate()
183    ///         .concat(data)
184    ///         .assert_eq(evens);
185    /// });
186    /// ```
187    pub fn negate(self) -> Self where C: containers::Negate {
188        use timely::dataflow::channels::pact::Pipeline;
189        self.inner
190            .unary(Pipeline, "Negate", move |_,_| move |input, output| {
191                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
192            })
193            .as_collection()
194    }
195
196    /// Brings a Collection into a nested scope.
197    ///
198    /// # Examples
199    ///
200    /// ```
201    /// use timely::dataflow::Scope;
202    /// use differential_dataflow::input::Input;
203    ///
204    /// ::timely::example(|scope| {
205    ///
206    ///     let data = scope.new_collection_from(1 .. 10).1;
207    ///
208    ///     let result = scope.region(|child| {
209    ///         data.clone()
210    ///             .enter(child)
211    ///             .leave(scope)
212    ///     });
213    ///
214    ///     data.assert_eq(result);
215    /// });
216    /// ```
217    pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Collection<'inner, TInner, <C as containers::Enter<T, TInner>>::InnerContainer>
218    where
219        C: containers::Enter<T, TInner, InnerContainer: Container>,
220        TInner: Refines<T>,
221    {
222        use timely::dataflow::channels::pact::Pipeline;
223        self.inner
224            .enter(child)
225            .unary(Pipeline, "Enter", move |_,_| move |input, output| {
226                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
227            })
228            .as_collection()
229    }
230
231    /// Advances a timestamp in the stream according to the timestamp actions on the path.
232    ///
233    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
234    /// incrementing fields would result in integer overflow. In this case, the record is dropped.
235    ///
236    /// # Examples
237    /// ```
238    /// use timely::dataflow::Scope;
239    /// use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::BranchWhen};
240    ///
241    /// use differential_dataflow::input::Input;
242    ///
243    /// timely::example(|scope| {
244    ///     let summary1 = 5;
245    ///
246    ///     let data = scope.new_collection_from(1 .. 10).1;
247    ///     /// Applies `results_in` on every timestamp in the collection.
248    ///     data.results_in(summary1);
249    /// });
250    /// ```
251    pub fn results_in(self, step: T::Summary) -> Self
252    where
253        C: containers::ResultsIn<T::Summary>,
254    {
255        use timely::dataflow::channels::pact::Pipeline;
256        self.inner
257            .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
258                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
259            })
260            .as_collection()
261    }
262}
263
264use timely::progress::timestamp::Refines;
265
266/// Methods requiring a nested scope.
267impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C>
268{
269    /// Returns the final value of a Collection from a nested scope to its containing scope.
270    ///
271    /// # Examples
272    ///
273    /// ```
274    /// use timely::dataflow::Scope;
275    /// use differential_dataflow::input::Input;
276    ///
277    /// ::timely::example(|scope| {
278    ///
279    ///    let data = scope.new_collection_from(1 .. 10).1;
280    ///
281    ///    let result = scope.region(|child| {
282    ///         data.clone()
283    ///             .enter(child)
284    ///             .leave(scope)
285    ///     });
286    ///
287    ///     data.assert_eq(result);
288    /// });
289    /// ```
290    pub fn leave<'outer, TOuter>(self, outer: Scope<'outer, TOuter>) -> Collection<'outer, TOuter, <C as containers::Leave<T, TOuter>>::OuterContainer>
291    where
292        TOuter: Timestamp,
293        T: Refines<TOuter>,
294        C: containers::Leave<T, TOuter, OuterContainer: Container>,
295    {
296        use timely::dataflow::channels::pact::Pipeline;
297        self.inner
298            .leave(outer)
299            .unary(Pipeline, "Leave", move |_,_| move |input, output| {
300                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
301            })
302            .as_collection()
303    }
304
305    /// Returns the value of a Collection from a nested region to its containing scope.
306    ///
307    /// This method is a specialization of `leave` to the case that of a nested region.
308    /// It removes the need for an operator that adjusts the timestamp.
309    pub fn leave_region<'outer>(self, outer: Scope<'outer, T>) -> Collection<'outer, T, C> {
310        self.inner
311            .leave(outer)
312            .as_collection()
313    }
314}
315
316pub use vec::Collection as VecCollection;
317/// Specializations of `Collection` that use `Vec` as the container.
318pub mod vec {
319
320    use std::hash::Hash;
321
322    use timely::progress::Timestamp;
323    use timely::order::Product;
324    use timely::dataflow::scope::Iterative;
325    use timely::dataflow::operators::*;
326    use timely::dataflow::operators::vec::*;
327
328    use crate::collection::AsCollection;
329    use crate::difference::{Semigroup, Abelian, Multiply};
330    use crate::lattice::Lattice;
331    use crate::hashable::Hashable;
332
333    /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
334    ///
335    /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
336    /// differential dataflow computation, you write as if the collection is a static dataset to which you
337    /// apply functional transformations, creating new collections. Once your computation is written, you
338    /// are able to mutate the collection (by inserting and removing elements); differential dataflow will
339    /// propagate changes through your functional computation and report the corresponding changes to the
340    /// output collections.
341    ///
342    /// Each vec collection has three generic parameters. The parameter `T` is the timestamp type of the
343    /// scope in which the collection exists; as you write more complicated programs you may wish to
344    /// introduce nested scopes (e.g. for iteration), and this parameter tracks the scope's timestamp
345    /// (for timely dataflow's benefit). The `D` parameter is the type of data in your collection, for
346    /// example `String`, or `(u32, Vec<Option<()>>)`.
347    /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
348    /// defaults to) `isize`, representing changes to the occurrence count of each record.
349    ///
350    /// This type definition instantiates the [`Collection`] type with a `Vec<(D, T, R)>`.
351    pub type Collection<'scope, T, D, R = isize> = super::Collection<'scope, T, Vec<(D, T, R)>>;
352
353
354    impl<'scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<'scope, T, D, R> {
355        /// Creates a new collection by applying the supplied function to each input element.
356        ///
357        /// # Examples
358        ///
359        /// ```
360        /// use differential_dataflow::input::Input;
361        ///
362        /// ::timely::example(|scope| {
363        ///     scope.new_collection_from(1 .. 10).1
364        ///          .map(|x| x * 2)
365        ///          .filter(|x| x % 2 == 1)
366        ///          .assert_empty();
367        /// });
368        /// ```
369        pub fn map<D2, L>(self, mut logic: L) -> Collection<'scope, T, D2, R>
370        where
371            D2: Clone+'static,
372            L: FnMut(D) -> D2 + 'static,
373        {
374            self.inner
375                .map(move |(data, time, delta)| (logic(data), time, delta))
376                .as_collection()
377        }
378        /// Creates a new collection by applying the supplied function to each input element.
379        ///
380        /// Although the name suggests in-place mutation, this function does not change the source collection,
381        /// but rather re-uses the underlying allocations in its implementation. The method is semantically
382        /// equivalent to `map`, but can be more efficient.
383        ///
384        /// # Examples
385        ///
386        /// ```
387        /// use differential_dataflow::input::Input;
388        ///
389        /// ::timely::example(|scope| {
390        ///     scope.new_collection_from(1 .. 10).1
391        ///          .map_in_place(|x| *x *= 2)
392        ///          .filter(|x| x % 2 == 1)
393        ///          .assert_empty();
394        /// });
395        /// ```
396        pub fn map_in_place<L>(self, mut logic: L) -> Collection<'scope, T, D, R>
397        where
398            L: FnMut(&mut D) + 'static,
399        {
400            self.inner
401                .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
402                .as_collection()
403        }
404        /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
405        ///
406        /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
407        /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
408        /// attempting to consolidate the results.
409        ///
410        /// # Examples
411        ///
412        /// ```
413        /// use differential_dataflow::input::Input;
414        ///
415        /// ::timely::example(|scope| {
416        ///     scope.new_collection_from(1 .. 10).1
417        ///          .flat_map(|x| 0 .. x);
418        /// });
419        /// ```
420        pub fn flat_map<I, L>(self, mut logic: L) -> Collection<'scope, T, I::Item, R>
421        where
422            T: Clone,
423            I: IntoIterator<Item: Clone+'static>,
424            L: FnMut(D) -> I + 'static,
425        {
426            self.inner
427                .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
428                .as_collection()
429        }
430        /// Creates a new collection containing those input records satisfying the supplied predicate.
431        ///
432        /// # Examples
433        ///
434        /// ```
435        /// use differential_dataflow::input::Input;
436        ///
437        /// ::timely::example(|scope| {
438        ///     scope.new_collection_from(1 .. 10).1
439        ///          .map(|x| x * 2)
440        ///          .filter(|x| x % 2 == 1)
441        ///          .assert_empty();
442        /// });
443        /// ```
444        pub fn filter<L>(self, mut logic: L) -> Collection<'scope, T, D, R>
445        where
446            L: FnMut(&D) -> bool + 'static,
447        {
448            self.inner
449                .filter(move |(data, _, _)| logic(data))
450                .as_collection()
451        }
452        /// Replaces each record with another, with a new difference type.
453        ///
454        /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
455        /// and move the data into the difference component. This will allow differential dataflow to update in-place.
456        ///
457        /// # Examples
458        ///
459        /// ```
460        /// use differential_dataflow::input::Input;
461        ///
462        /// ::timely::example(|scope| {
463        ///
464        ///     let nums = scope.new_collection_from(0 .. 10).1;
465        ///     let x1 = nums.clone().flat_map(|x| 0 .. x);
466        ///     let x2 = nums.map(|x| (x, 9 - x))
467        ///                  .explode(|(x,y)| Some((x,y)));
468        ///
469        ///     x1.assert_eq(x2);
470        /// });
471        /// ```
472        pub fn explode<D2, R2, I, L>(self, mut logic: L) -> Collection<'scope, T, D2, <R2 as Multiply<R>>::Output>
473        where
474            D2: Clone+'static,
475            R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
476            I: IntoIterator<Item=(D2,R2)>,
477            L: FnMut(D)->I+'static,
478        {
479            self.inner
480                .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
481                .as_collection()
482        }
483
484        /// Joins each record against a collection defined by the function `logic`.
485        ///
486        /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
487        /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
488        /// modifications made to the results, namely joining timestamps and multiplying differences.
489        ///
490        /// #Examples
491        ///
492        /// ```
493        /// use differential_dataflow::input::Input;
494        ///
495        /// ::timely::example(|scope| {
496        ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
497        ///     // for x from 0 through 9.
498        ///     scope.new_collection_from(0 .. 10isize).1
499        ///          .join_function(|x|
500        ///              //   data      time      diff
501        ///              vec![(2*x, (3*x) as u64,  x),
502        ///                   (2*x, (4*x) as u64, -x)]
503        ///           );
504        /// });
505        /// ```
506        pub fn join_function<D2, R2, I, L>(self, mut logic: L) -> Collection<'scope, T, D2, <R2 as Multiply<R>>::Output>
507        where
508            T: Lattice,
509            D2: Clone+'static,
510            R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
511            I: IntoIterator<Item=(D2,T,R2)>,
512            L: FnMut(D)->I+'static,
513        {
514            self.inner
515                .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
516                .as_collection()
517        }
518
519        /// Brings a Collection into a nested scope, at varying times.
520        ///
521        /// The `initial` function indicates the time at which each element of the Collection should appear.
522        ///
523        /// # Examples
524        ///
525        /// ```
526        /// use timely::dataflow::Scope;
527        /// use differential_dataflow::input::Input;
528        ///
529        /// ::timely::example(|scope| {
530        ///
531        ///     let data = scope.new_collection_from(1 .. 10).1;
532        ///
533        ///     let result = scope.iterative::<u64,_,_>(|child| {
534        ///         data.clone()
535        ///             .enter_at(child, |x| *x)
536        ///             .leave(scope)
537        ///     });
538        ///
539        ///     data.assert_eq(result);
540        /// });
541        /// ```
542        pub fn enter_at<'inner, TInner, F>(self, child: Iterative<'inner, T, TInner>, mut initial: F) -> Collection<'inner, Product<T, TInner>, D, R>
543        where
544            TInner: Timestamp+Hash,
545            F: FnMut(&D) -> TInner + Clone + 'static,
546        {
547            self.inner
548                .enter(child)
549                .map(move |(data, time, diff)| {
550                    let new_time = Product::new(time, initial(&data));
551                    (data, new_time, diff)
552                })
553                .as_collection()
554        }
555
556        /// Delays each difference by a supplied function.
557        ///
558        /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
559        /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
560        /// ordered, they should have the same order or compare equal once `func` is applied to them (this
561        /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
562        /// to all of the data timestamps).
563        pub fn delay<F>(self, func: F) -> Collection<'scope, T, D, R>
564        where
565            T: Hash,
566            F: FnMut(&T) -> T + Clone + 'static,
567        {
568            let mut func1 = func.clone();
569            let mut func2 = func.clone();
570
571            self.inner
572                .delay_batch(move |x| func1(x))
573                .map_in_place(move |x| x.1 = func2(&x.1))
574                .as_collection()
575        }
576
577        /// Applies a supplied function to each update.
578        ///
579        /// This method is most commonly used to report information back to the user, often for debugging purposes.
580        /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
581        /// not guarantee that it will be called as many times as you might expect.
582        ///
583        /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
584        /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
585        /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
586        /// interesting and less intuitive, unfortunately.
587        ///
588        /// # Examples
589        ///
590        /// ```
591        /// use differential_dataflow::input::Input;
592        ///
593        /// ::timely::example(|scope| {
594        ///     scope.new_collection_from(1 .. 10).1
595        ///          .map_in_place(|x| *x *= 2)
596        ///          .filter(|x| x % 2 == 1)
597        ///          .inspect(|x| println!("error: {:?}", x));
598        /// });
599        /// ```
600        pub fn inspect<F>(self, func: F) -> Collection<'scope, T, D, R>
601        where
602            F: FnMut(&(D, T, R))+'static,
603        {
604            self.inner
605                .inspect(func)
606                .as_collection()
607        }
608        /// Applies a supplied function to each batch of updates.
609        ///
610        /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
611        /// timely dataflow capability associated with the batch of updates. The observed batching depends
612        /// on how the system executes, and may vary run to run.
613        ///
614        /// # Examples
615        ///
616        /// ```
617        /// use differential_dataflow::input::Input;
618        ///
619        /// ::timely::example(|scope| {
620        ///     scope.new_collection_from(1 .. 10).1
621        ///          .map_in_place(|x| *x *= 2)
622        ///          .filter(|x| x % 2 == 1)
623        ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
624        /// });
625        /// ```
626        pub fn inspect_batch<F>(self, mut func: F) -> Collection<'scope, T, D, R>
627        where
628            F: FnMut(&T, &[(D, T, R)])+'static,
629        {
630            self.inner
631                .inspect_batch(move |time, data| func(time, data))
632                .as_collection()
633        }
634
635        /// Assert if the collection is ever non-empty.
636        ///
637        /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
638        /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
639        /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
640        /// program should indicate that this assertion never found cause to complain.
641        ///
642        /// # Examples
643        ///
644        /// ```
645        /// use differential_dataflow::input::Input;
646        ///
647        /// ::timely::example(|scope| {
648        ///     scope.new_collection_from(1 .. 10).1
649        ///          .map(|x| x * 2)
650        ///          .filter(|x| x % 2 == 1)
651        ///          .assert_empty();
652        /// });
653        /// ```
654        pub fn assert_empty(self)
655        where
656            D: crate::ExchangeData+Hashable,
657            R: crate::ExchangeData+Hashable + Semigroup,
658            T: Lattice+Ord,
659        {
660            self.consolidate()
661                .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
662        }
663    }
664
665    /// Methods requiring an Abelian difference, to support negation.
666    impl<'scope, T: Timestamp + Clone + 'static, D: Clone+'static, R: Abelian+'static> Collection<'scope, T, D, R> {
667        /// Assert if the collections are ever different.
668        ///
669        /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
670        /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
671        /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
672        /// indicate that this assertion never found cause to complain.
673        ///
674        /// # Examples
675        ///
676        /// ```
677        /// use differential_dataflow::input::Input;
678        ///
679        /// ::timely::example(|scope| {
680        ///
681        ///     let data = scope.new_collection_from(1 .. 10).1;
682        ///
683        ///     let odds = data.clone().filter(|x| x % 2 == 1);
684        ///     let evens = data.clone().filter(|x| x % 2 == 0);
685        ///
686        ///     odds.concat(evens)
687        ///         .assert_eq(data);
688        /// });
689        /// ```
690        pub fn assert_eq(self, other: Self)
691        where
692            D: crate::ExchangeData+Hashable,
693            R: crate::ExchangeData+Hashable,
694            T: Lattice+Ord,
695        {
696            self.negate()
697                .concat(other)
698                .assert_empty();
699        }
700    }
701
702    use crate::trace::{Trace, Builder};
703    use crate::operators::arrange::{Arranged, TraceAgent};
704
705    impl <'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
706    where
707        T: Timestamp + Lattice + Ord,
708        K: crate::ExchangeData+Hashable,
709        V: crate::ExchangeData,
710        R: crate::ExchangeData+Semigroup,
711    {
712        /// Applies a reduction function on records grouped by key.
713        ///
714        /// Input data must be structured as `(key, val)` pairs.
715        /// The user-supplied reduction function takes as arguments
716        ///
717        /// 1. a reference to the key,
718        /// 2. a reference to the slice of values and their accumulated updates,
719        /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
720        ///
721        /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
722        /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
723        /// `Ord` implementations.
724        ///
725        /// # Examples
726        ///
727        /// ```
728        /// use differential_dataflow::input::Input;
729        ///
730        /// ::timely::example(|scope| {
731        ///     // report the smallest value for each group
732        ///     scope.new_collection_from(1 .. 10).1
733        ///          .map(|x| (x / 3, x))
734        ///          .reduce(|_key, input, output| {
735        ///              output.push((*input[0].0, 1))
736        ///          });
737        /// });
738        /// ```
739        pub fn reduce<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, logic: L) -> Collection<'scope, T, (K, V2), R2>
740        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
741            self.reduce_named("Reduce", logic)
742        }
743
744        /// As `reduce` with the ability to name the operator.
745        pub fn reduce_named<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, name: &str, logic: L) -> Collection<'scope, T, (K, V2), R2>
746        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
747            use crate::trace::implementations::{ValBuilder, ValSpine};
748
749            self.arrange_by_key_named(&format!("Arrange: {}", name))
750                .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>,_>(
751                    name,
752                    logic,
753                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
754                )
755                .as_collection(|k,v| (k.clone(), v.clone()))
756        }
757
758        /// Applies `reduce` to arranged data, and returns an arrangement of output data.
759        ///
760        /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
761        /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
762        ///
763        /// # Examples
764        ///
765        /// ```
766        /// use differential_dataflow::input::Input;
767        /// use differential_dataflow::trace::Trace;
768        /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
769        ///
770        /// ::timely::example(|scope| {
771        ///
772        ///     let trace =
773        ///     scope.new_collection_from(1 .. 10u32).1
774        ///          .map(|x| (x, x))
775        ///          .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
776        ///             "Example",
777        ///              move |_key, src, dst| dst.push((*src[0].0, 1))
778        ///          )
779        ///          .trace;
780        /// });
781        /// ```
782        pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent<T2>>
783        where
784            T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static,
785            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
786            L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
787        {
788            self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
789                if !input.is_empty() { logic(key, input, change); }
790                change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
791                crate::consolidation::consolidate(change);
792            })
793        }
794
795        /// Solves for output updates when presented with inputs and would-be outputs.
796        ///
797        /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
798        /// and it may not be safe to index into the first element.
799        /// At least one of the two collections will be non-empty.
800        pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
801        where
802            V: Clone+'static,
803            T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=T>+'static,
804            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
805            L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
806        {
807            self.arrange_by_key_named(&format!("Arrange: {}", name))
808                .reduce_core::<_,Bu,_,_>(
809                    name,
810                    logic,
811                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
812                )
813        }
814    }
815
816    impl<'scope, T, K, R1> Collection<'scope, T, K, R1>
817    where
818        T: Timestamp + Lattice + Ord,
819        K: crate::ExchangeData+Hashable,
820        R1: crate::ExchangeData+Semigroup
821    {
822
823        /// Reduces the collection to one occurrence of each distinct element.
824        ///
825        /// # Examples
826        ///
827        /// ```
828        /// use differential_dataflow::input::Input;
829        ///
830        /// ::timely::example(|scope| {
831        ///     // report at most one of each key.
832        ///     scope.new_collection_from(1 .. 10).1
833        ///          .map(|x| x / 3)
834        ///          .distinct();
835        /// });
836        /// ```
837        pub fn distinct(self) -> Collection<'scope, T, K, isize> {
838            self.distinct_core()
839        }
840
841        /// Distinct for general integer differences.
842        ///
843        /// This method allows `distinct` to produce collections whose difference
844        /// type is something other than an `isize` integer, for example perhaps an
845        /// `i32`.
846        pub fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(self) -> Collection<'scope, T, K, R2> {
847            self.threshold_named("Distinct", |_,_| R2::from(1i8))
848        }
849
850        /// Transforms the multiplicity of records.
851        ///
852        /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
853        /// least the computation may behave as if it does. Otherwise, the transformation
854        /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
855        ///
856        /// # Examples
857        ///
858        /// ```
859        /// use differential_dataflow::input::Input;
860        ///
861        /// ::timely::example(|scope| {
862        ///     // report at most one of each key.
863        ///     scope.new_collection_from(1 .. 10).1
864        ///          .map(|x| x / 3)
865        ///          .threshold(|_,c| c % 2);
866        /// });
867        /// ```
868        pub fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(self, thresh: F) -> Collection<'scope, T, K, R2> {
869            self.threshold_named("Threshold", thresh)
870        }
871
872        /// A `threshold` with the ability to name the operator.
873        pub fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(self, name: &str, mut thresh: F) -> Collection<'scope, T, K, R2> {
874            use crate::trace::implementations::{KeyBuilder, KeySpine};
875
876            self.arrange_by_self_named(&format!("Arrange: {}", name))
877                .reduce_abelian::<_,KeyBuilder<K,T,R2>,KeySpine<K,T,R2>,_>(
878                    name,
879                    move |k,s,t| t.push(((), thresh(k, &s[0].1))),
880                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
881                )
882                .as_collection(|k,_| k.clone())
883        }
884
885    }
886
887    impl<'scope, T, K, R> Collection<'scope, T, K, R>
888    where
889        T: Timestamp + Lattice + Ord,
890        K: crate::ExchangeData+Hashable,
891        R: crate::ExchangeData+Semigroup
892    {
893
894        /// Counts the number of occurrences of each element.
895        ///
896        /// # Examples
897        ///
898        /// ```
899        /// use differential_dataflow::input::Input;
900        ///
901        /// ::timely::example(|scope| {
902        ///     // report the number of occurrences of each key
903        ///     scope.new_collection_from(1 .. 10).1
904        ///          .map(|x| x / 3)
905        ///          .count();
906        /// });
907        /// ```
908        pub fn count(self) -> Collection<'scope, T, (K, R), isize> { self.count_core() }
909
910        /// Count for general integer differences.
911        ///
912        /// This method allows `count` to produce collections whose difference
913        /// type is something other than an `isize` integer, for example perhaps an
914        /// `i32`.
915        pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<'scope, T, (K, R), R2> {
916            use crate::trace::implementations::{ValBuilder, ValSpine};
917            self.arrange_by_self_named("Arrange: Count")
918                .reduce_abelian::<_,ValBuilder<K,R,T,R2>,ValSpine<K,R,T,R2>,_>(
919                    "Count",
920                    |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))),
921                    |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
922                )
923                .as_collection(|k,c| (k.clone(), c.clone()))
924        }
925    }
926
927    /// Methods which require data be arrangeable.
928    impl<'scope, T, D, R> Collection<'scope, T, D, R>
929    where
930        T: Timestamp + Clone + 'static + Lattice,
931        D: crate::ExchangeData+Hashable,
932        R: crate::ExchangeData+Semigroup,
933    {
934        /// Aggregates the weights of equal records into at most one record.
935        ///
936        /// This method uses the type `D`'s `hashed()` method to partition the data. The data are
937        /// accumulated in place, each held back until their timestamp has completed.
938        ///
939        /// # Examples
940        ///
941        /// ```
942        /// use differential_dataflow::input::Input;
943        ///
944        /// ::timely::example(|scope| {
945        ///
946        ///     let x = scope.new_collection_from(1 .. 10u32).1;
947        ///
948        ///     x.clone()
949        ///      .negate()
950        ///      .concat(x)
951        ///      .consolidate() // <-- ensures cancellation occurs
952        ///      .assert_empty();
953        /// });
954        /// ```
955        pub fn consolidate(self) -> Self {
956            use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
957            self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
958        }
959
960        /// As `consolidate` but with the ability to name the operator, specify the trace type,
961        /// and provide the function `reify` to produce owned keys and values..
962        pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
963        where
964            Ba: crate::trace::Batcher<Output=Vec<((D, ()), T, R)>, Time=T> + 'static,
965            Tr: for<'a> crate::trace::Trace<Time=T,Diff=R>+'static,
966            Bu: crate::trace::Builder<Time=Tr::Time, Input=Vec<((D, ()), T, R)>, Output=Tr::Batch>,
967            F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
968        {
969            use crate::operators::arrange::arrangement::Arrange;
970            self.map(|k| (k, ()))
971                .arrange_named::<Ba, Bu, Tr>(name)
972                .as_collection(reify)
973        }
974
975        /// Aggregates the weights of equal records.
976        ///
977        /// Unlike `consolidate`, this method does not exchange data and does not
978        /// ensure that at most one copy of each `(data, time)` pair exists in the
979        /// results. Instead, it acts on each batch of data and collapses equivalent
980        /// `(data, time)` pairs found therein, suppressing any that accumulate to
981        /// zero.
982        ///
983        /// # Examples
984        ///
985        /// ```
986        /// use differential_dataflow::input::Input;
987        ///
988        /// ::timely::example(|scope| {
989        ///
990        ///     let x = scope.new_collection_from(1 .. 10u32).1;
991        ///
992        ///     // nothing to assert, as no particular guarantees.
993        ///     x.clone()
994        ///      .negate()
995        ///      .concat(x)
996        ///      .consolidate_stream();
997        /// });
998        /// ```
999        pub fn consolidate_stream(self) -> Self {
1000
1001            use timely::dataflow::channels::pact::Pipeline;
1002            use timely::dataflow::operators::Operator;
1003            use crate::collection::AsCollection;
1004            use crate::consolidation::ConsolidatingContainerBuilder;
1005
1006            self.inner
1007                .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {
1008
1009                    move |input, output| {
1010                        input.for_each(|time, data| {
1011                            output.session_with_builder(&time).give_iterator(data.drain(..));
1012                        })
1013                    }
1014                })
1015                .as_collection()
1016        }
1017    }
1018
1019    use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
1020    use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
1021    use crate::trace::implementations::ContainerChunker;
1022    use crate::operators::arrange::Arrange;
1023
1024    impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R>
1025    where
1026        T: Timestamp + Lattice,
1027        K: crate::ExchangeData + Hashable,
1028        V: crate::ExchangeData,
1029        R: crate::ExchangeData + Semigroup,
1030    {
1031        fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
1032        where
1033            Ba: crate::trace::Batcher<Output=Vec<((K, V), T, R)>, Time=T> + 'static,
1034            Bu: crate::trace::Builder<Time=T, Input=Vec<((K, V), T, R)>, Output = Tr::Batch>,
1035            Tr: crate::trace::Trace<Time=T> + 'static,
1036        {
1037            let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
1038            crate::operators::arrange::arrangement::arrange_core::<_, _, ContainerChunker<Vec<((K, V), T, R)>>, Ba, Bu, _>(self.inner, exchange, name)
1039        }
1040    }
1041
1042    impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R>
1043    where
1044        T: Timestamp + Lattice + Ord,
1045    {
1046        fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
1047        where
1048            Ba: crate::trace::Batcher<Output=Vec<((K, ()), T, R)>, Time=T> + 'static,
1049            Bu: crate::trace::Builder<Time=T, Input=Vec<((K, ()), T, R)>, Output = Tr::Batch>,
1050            Tr: crate::trace::Trace<Time=T> + 'static,
1051        {
1052            let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
1053            crate::operators::arrange::arrangement::arrange_core::<_, _, ContainerChunker<Vec<((K, ()), T, R)>>, Ba, Bu, _>(self.map(|k| (k, ())).inner, exchange, name)
1054        }
1055    }
1056
1057
1058    impl<'scope, T, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<'scope, T, (K,V), R>
1059    where
1060        T: Timestamp + Lattice + Ord,
1061    {
1062        /// Arranges a collection of `(Key, Val)` records by `Key`.
1063        ///
1064        /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
1065        /// This trace is current for all times completed by the output stream, which can be used to
1066        /// safely identify the stable times and values in the trace.
1067        pub fn arrange_by_key(self) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
1068            self.arrange_by_key_named("ArrangeByKey")
1069        }
1070
1071        /// As `arrange_by_key` but with the ability to name the arrangement.
1072        pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
1073            self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
1074        }
1075    }
1076
1077    impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R>
1078    where
1079        T: Timestamp + Lattice + Ord,
1080    {
1081        /// Arranges a collection of `Key` records by `Key`.
1082        ///
1083        /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
1084        /// This trace is current for all times complete in the output stream, which can be used to safely
1085        /// identify the stable times and values in the trace.
1086        pub fn arrange_by_self(self) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
1087            self.arrange_by_self_named("ArrangeBySelf")
1088        }
1089
1090        /// As `arrange_by_self` but with the ability to name the arrangement.
1091        pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
1092            self.map(|k| (k, ()))
1093                .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
1094        }
1095    }
1096
1097    impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
1098    where
1099        T: Timestamp + Lattice + Ord,
1100        K: crate::ExchangeData+Hashable,
1101        V: crate::ExchangeData,
1102        R: crate::ExchangeData+Semigroup,
1103    {
1104        /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
1105        ///
1106        /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
1107        ///
1108        /// # Examples
1109        ///
1110        /// ```
1111        /// use differential_dataflow::input::Input;
1112        ///
1113        /// ::timely::example(|scope| {
1114        ///
1115        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1116        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1117        ///     let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
1118        ///
1119        ///     x.join(y)
1120        ///      .assert_eq(z);
1121        /// });
1122        /// ```
1123        pub fn join<V2, R2>(self, other: Collection<'scope, T, (K,V2), R2>) -> Collection<'scope, T, (K,(V,V2)), <R as Multiply<R2>>::Output>
1124        where
1125            K:  crate::ExchangeData,
1126            V2: crate::ExchangeData,
1127            R2: crate::ExchangeData+Semigroup,
1128            R: Multiply<R2, Output: Semigroup+'static>,
1129        {
1130            self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
1131        }
1132
1133        /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
1134        ///
1135        /// # Examples
1136        ///
1137        /// ```
1138        /// use differential_dataflow::input::Input;
1139        ///
1140        /// ::timely::example(|scope| {
1141        ///
1142        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1143        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1144        ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1145        ///
1146        ///     x.join_map(y, |_key, &a, &b| (a,b))
1147        ///      .assert_eq(z);
1148        /// });
1149        /// ```
1150        pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(self, other: Collection<'scope, T, (K, V2), R2>, mut logic: L) -> Collection<'scope, T, D, <R as Multiply<R2>>::Output>
1151        where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
1152            let arranged1 = self.arrange_by_key();
1153            let arranged2 = other.arrange_by_key();
1154            arranged1.join_core(arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
1155        }
1156
1157        /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
1158        ///
1159        /// When the second collection contains frequencies that are either zero or one this is the more traditional
1160        /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
1161        /// the counts of the records in the first input.
1162        ///
1163        /// # Examples
1164        ///
1165        /// ```
1166        /// use differential_dataflow::input::Input;
1167        ///
1168        /// ::timely::example(|scope| {
1169        ///
1170        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1171        ///     let y = scope.new_collection_from(vec![0, 2]).1;
1172        ///     let z = scope.new_collection_from(vec![(0, 1)]).1;
1173        ///
1174        ///     x.semijoin(y)
1175        ///      .assert_eq(z);
1176        /// });
1177        /// ```
1178        pub fn semijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), <R as Multiply<R2>>::Output>
1179        where R: Multiply<R2, Output: Semigroup+'static> {
1180            let arranged1 = self.arrange_by_key();
1181            let arranged2 = other.arrange_by_self();
1182            arranged1.join_core(arranged2, |k,v,_| Some((k.clone(), v.clone())))
1183        }
1184
1185        /// Subtracts the semijoin with `other` from `self`.
1186        ///
1187        /// In the case that `other` has multiplicities zero or one this results
1188        /// in a relational antijoin, in which we discard input records whose key
1189        /// is present in `other`. If the multiplicities could be other than zero
1190        /// or one, the semantic interpretation of this operator is less clear.
1191        ///
1192        /// In almost all cases, you should ensure that `other` has multiplicities
1193        /// that are zero or one, perhaps by using the `distinct` operator.
1194        ///
1195        /// # Examples
1196        ///
1197        /// ```
1198        /// use differential_dataflow::input::Input;
1199        ///
1200        /// ::timely::example(|scope| {
1201        ///
1202        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1203        ///     let y = scope.new_collection_from(vec![0, 2]).1;
1204        ///     let z = scope.new_collection_from(vec![(1, 3)]).1;
1205        ///
1206        ///     x.antijoin(y)
1207        ///      .assert_eq(z);
1208        /// });
1209        /// ```
1210        pub fn antijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), R>
1211        where R: Multiply<R2, Output=R>, R: Abelian+'static {
1212            self.clone().concat(self.semijoin(other).negate())
1213        }
1214
1215        /// Joins two arranged collections with the same key type.
1216        ///
1217        /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
1218        /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
1219        /// every value returned by the iterator.
1220        ///
1221        /// # Examples
1222        ///
1223        /// ```
1224        /// use differential_dataflow::input::Input;
1225        /// use differential_dataflow::trace::Trace;
1226        ///
1227        /// ::timely::example(|scope| {
1228        ///
1229        ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
1230        ///                  .arrange_by_key();
1231        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
1232        ///                  .arrange_by_key();
1233        ///
1234        ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1235        ///
1236        ///     x.join_core(y, |_key, &a, &b| Some((a, b)))
1237        ///      .assert_eq(z);
1238        /// });
1239        /// ```
1240        pub fn join_core<Tr2,I,L> (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,<R as Multiply<Tr2::Diff>>::Output>
1241        where
1242            Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=T>+Clone+'static,
1243            R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
1244            I: IntoIterator<Item: crate::Data>,
1245            L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
1246        {
1247            self.arrange_by_key()
1248                .join_core(stream2, result)
1249        }
1250    }
1251}
1252
1253/// Conversion to a differential dataflow Collection.
1254pub trait AsCollection<'scope, T: Timestamp, C> {
1255    /// Converts the type to a differential dataflow collection.
1256    fn as_collection(self) -> Collection<'scope, T, C>;
1257}
1258
1259impl<'scope, T: Timestamp, C> AsCollection<'scope, T, C> for Stream<'scope, T, C> {
1260    /// Converts the type to a differential dataflow collection.
1261    ///
1262    /// By calling this method, you guarantee that the timestamp invariant (as documented on
1263    /// [Collection]) is upheld. This method will not check it.
1264    fn as_collection(self) -> Collection<'scope, T, C> {
1265        Collection::<T,C>::new(self)
1266    }
1267}
1268
1269/// Concatenates multiple collections.
1270///
1271/// This method has the effect of a sequence of calls to `concat`, but it does
1272/// so in one operator rather than a chain of many operators.
1273///
1274/// # Examples
1275///
1276/// ```
1277/// use differential_dataflow::input::Input;
1278///
1279/// ::timely::example(|scope| {
1280///
1281///     let data = scope.new_collection_from(1 .. 10).1;
1282///
1283///     let odds = data.clone().filter(|x| x % 2 == 1);
1284///     let evens = data.clone().filter(|x| x % 2 == 0);
1285///
1286///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
1287///         .assert_eq(data);
1288/// });
1289/// ```
1290pub fn concatenate<'scope, T, C, I>(scope: Scope<'scope, T>, iterator: I) -> Collection<'scope, T, C>
1291where
1292    T: Timestamp,
1293    C: Container,
1294    I: IntoIterator<Item=Collection<'scope, T, C>>,
1295{
1296    scope
1297        .concatenate(iterator.into_iter().map(|x| x.inner))
1298        .as_collection()
1299}
1300
1301/// Traits that can be implemented by containers to provide functionality to collections based on them.
1302pub mod containers {
1303
1304    /// A container that can negate its updates.
1305    pub trait Negate {
1306        /// Negates Abelian differences of each update.
1307        fn negate(self) -> Self;
1308    }
1309
1310    /// A container that can enter from timestamp `T1` into timestamp `T2`.
1311    pub trait Enter<T1, T2> {
1312        /// The resulting container type.
1313        type InnerContainer;
1314        /// Update timestamps from `T1` to `T2`.
1315        fn enter(self) -> Self::InnerContainer;
1316    }
1317
1318    /// A container that can leave from timestamp `T1` into timestamp `T2`.
1319    pub trait Leave<T1, T2> {
1320        /// The resulting container type.
1321        type OuterContainer;
1322        /// Update timestamps from `T1` to `T2`.
1323        fn leave(self) -> Self::OuterContainer;
1324    }
1325
1326    /// A container that can advance timestamps by a summary `TS`.
1327    pub trait ResultsIn<TS> {
1328        /// Advance times in the container by `step`.
1329        fn results_in(self, step: &TS) -> Self;
1330    }
1331
1332
1333    /// Implementations of container traits for the `Vec` container.
1334    mod vec {
1335
1336        use timely::progress::{Timestamp, timestamp::Refines};
1337        use crate::collection::Abelian;
1338
1339        use super::{Negate, Enter, Leave, ResultsIn};
1340
1341        impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
1342            fn negate(mut self) -> Self {
1343                for (_data, _time, diff) in self.iter_mut() { diff.negate(); }
1344                self
1345            }
1346        }
1347
1348        impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
1349            type InnerContainer = Vec<(D, T2, R)>;
1350            fn enter(self) -> Self::InnerContainer {
1351                self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
1352            }
1353        }
1354
1355        impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
1356            type OuterContainer = Vec<(D, T2, R)>;
1357            fn leave(self) -> Self::OuterContainer {
1358                self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
1359            }
1360        }
1361
1362        impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
1363            fn results_in(self, step: &T::Summary) -> Self {
1364                use timely::progress::PathSummary;
1365                self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
1366            }
1367        }
1368    }
1369
1370    /// Implementations of container traits for the `Rc` container.
1371    mod rc {
1372        use std::rc::Rc;
1373
1374        use timely::progress::{Timestamp, timestamp::Refines};
1375
1376        use super::{Negate, Enter, Leave, ResultsIn};
1377
1378        impl<C: Negate+Clone+Default> Negate for Rc<C> {
1379            fn negate(mut self) -> Self {
1380                std::mem::take(Rc::make_mut(&mut self)).negate().into()
1381            }
1382        }
1383
1384        impl<C: Enter<T1, T2>+Clone+Default, T1: Timestamp, T2: Refines<T1>> Enter<T1, T2> for Rc<C> {
1385            type InnerContainer = Rc<C::InnerContainer>;
1386            fn enter(mut self) -> Self::InnerContainer {
1387                std::mem::take(Rc::make_mut(&mut self)).enter().into()
1388            }
1389        }
1390
1391        impl<C: Leave<T1, T2>+Clone+Default, T1: Refines<T2>, T2: Timestamp> Leave<T1, T2> for Rc<C> {
1392            type OuterContainer = Rc<C::OuterContainer>;
1393            fn leave(mut self) -> Self::OuterContainer {
1394                std::mem::take(Rc::make_mut(&mut self)).leave().into()
1395            }
1396        }
1397
1398        impl<C: ResultsIn<TS>+Clone+Default, TS> ResultsIn<TS> for Rc<C> {
1399            fn results_in(mut self, step: &TS) -> Self {
1400                std::mem::take(Rc::make_mut(&mut self)).results_in(step).into()
1401            }
1402        }
1403    }
1404}