Skip to main content

differential_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use timely::Container;
12use timely::progress::Timestamp;
13use timely::dataflow::scopes::Child;
14use timely::dataflow::{Scope, Stream};
15use timely::dataflow::operators::*;
16
17use crate::difference::Abelian;
18
19/// An evolving collection represented by a stream of abstract containers.
20///
21/// The containers purport to reperesent changes to a collection, and they must implement various traits
22/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
23/// on the containers, and streams of containers, are left to the container implementor to describe.
24#[derive(Clone)]
25pub struct Collection<G: Scope, C: 'static> {
26    /// The underlying timely dataflow stream.
27    ///
28    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
29    /// not intended to be the idiomatic way to work with the collection.
30    ///
31    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
32    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
33    /// unexpectedly.
34    pub inner: Stream<G, C>,
35}
36
37impl<G: Scope, C> Collection<G, C> {
38    /// Creates a new Collection from a timely dataflow stream.
39    ///
40    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
41    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
42    /// provides a `new_collection` method which will create a new collection for you without exposing
43    /// the underlying timely stream at all.
44    ///
45    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
46    /// method does not check it.
47    pub fn new(stream: Stream<G, C>) -> Self { Self { inner: stream } }
48}
49impl<G: Scope, C: Container> Collection<G, C> {
50    /// Creates a new collection accumulating the contents of the two collections.
51    ///
52    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
53    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
54    /// two collections.
55    ///
56    /// # Examples
57    ///
58    /// ```
59    /// use differential_dataflow::input::Input;
60    ///
61    /// ::timely::example(|scope| {
62    ///
63    ///     let data = scope.new_collection_from(1 .. 10).1;
64    ///
65    ///     let odds = data.clone().filter(|x| x % 2 == 1);
66    ///     let evens = data.clone().filter(|x| x % 2 == 0);
67    ///
68    ///     odds.concat(evens)
69    ///         .assert_eq(data);
70    /// });
71    /// ```
72    pub fn concat(self, other: Self) -> Self {
73        self.inner
74            .concat(other.inner)
75            .as_collection()
76    }
77    /// Creates a new collection accumulating the contents of the two collections.
78    ///
79    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
80    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
81    /// two collections.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use differential_dataflow::input::Input;
87    ///
88    /// ::timely::example(|scope| {
89    ///
90    ///     let data = scope.new_collection_from(1 .. 10).1;
91    ///
92    ///     let odds = data.clone().filter(|x| x % 2 == 1);
93    ///     let evens = data.clone().filter(|x| x % 2 == 0);
94    ///
95    ///     odds.concatenate(Some(evens))
96    ///         .assert_eq(data);
97    /// });
98    /// ```
99    pub fn concatenate<I>(self, sources: I) -> Self
100    where
101        I: IntoIterator<Item=Self>
102    {
103        self.inner
104            .scope()
105            .concatenate(sources.into_iter().map(|x| x.inner).chain([self.inner]))
106            .as_collection()
107    }
108    // Brings a Collection into a nested region.
109    ///
110    /// This method is a specialization of `enter` to the case where the nested scope is a region.
111    /// It removes the need for an operator that adjusts the timestamp.
112    pub fn enter_region<'a>(self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
113        self.inner
114            .enter(child)
115            .as_collection()
116    }
117    /// Applies a supplied function to each batch of updates.
118    ///
119    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
120    /// timely dataflow capability associated with the batch of updates. The observed batching depends
121    /// on how the system executes, and may vary run to run.
122    ///
123    /// # Examples
124    ///
125    /// ```
126    /// use differential_dataflow::input::Input;
127    ///
128    /// ::timely::example(|scope| {
129    ///     scope.new_collection_from(1 .. 10).1
130    ///          .map_in_place(|x| *x *= 2)
131    ///          .filter(|x| x % 2 == 1)
132    ///          .inspect_container(|event| println!("event: {:?}", event));
133    /// });
134    /// ```
135    pub fn inspect_container<F>(self, func: F) -> Self
136    where
137        F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
138    {
139        self.inner
140            .inspect_container(func)
141            .as_collection()
142    }
143    /// Attaches a timely dataflow probe to the output of a Collection.
144    ///
145    /// This probe is used to determine when the state of the Collection has stabilized and can
146    /// be read out.
147    pub fn probe(self) -> (probe::Handle<G::Timestamp>, Self) {
148        let (handle, stream) = self.inner.probe();
149        (handle, stream.as_collection())
150    }
151    /// Attaches a timely dataflow probe to the output of a Collection.
152    ///
153    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
154    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
155    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
156    /// avoid swamping the system.
157    pub fn probe_with(self, handle: &probe::Handle<G::Timestamp>) -> Self {
158        Self::new(self.inner.probe_with(handle))
159    }
160    /// The scope containing the underlying timely dataflow stream.
161    pub fn scope(&self) -> G {
162        self.inner.scope()
163    }
164
165    /// Creates a new collection whose counts are the negation of those in the input.
166    ///
167    /// This method is most commonly used with `concat` to get those element in one collection but not another.
168    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
169    /// including negative counts.
170    ///
171    /// # Examples
172    ///
173    /// ```
174    /// use differential_dataflow::input::Input;
175    ///
176    /// ::timely::example(|scope| {
177    ///
178    ///     let data = scope.new_collection_from(1 .. 10).1;
179    ///
180    ///     let odds = data.clone().filter(|x| x % 2 == 1);
181    ///     let evens = data.clone().filter(|x| x % 2 == 0);
182    ///
183    ///     odds.negate()
184    ///         .concat(data)
185    ///         .assert_eq(evens);
186    /// });
187    /// ```
188    pub fn negate(self) -> Self where C: containers::Negate {
189        use timely::dataflow::channels::pact::Pipeline;
190        self.inner
191            .unary(Pipeline, "Negate", move |_,_| move |input, output| {
192                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
193            })
194            .as_collection()
195    }
196
197    /// Brings a Collection into a nested scope.
198    ///
199    /// # Examples
200    ///
201    /// ```
202    /// use timely::dataflow::Scope;
203    /// use differential_dataflow::input::Input;
204    ///
205    /// ::timely::example(|scope| {
206    ///
207    ///     let data = scope.new_collection_from(1 .. 10).1;
208    ///
209    ///     let result = scope.region(|child| {
210    ///         data.clone()
211    ///             .enter(child)
212    ///             .leave()
213    ///     });
214    ///
215    ///     data.assert_eq(result);
216    /// });
217    /// ```
218    pub fn enter<'a, T>(self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
219    where
220        C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
221        T: Refines<<G as ScopeParent>::Timestamp>,
222    {
223        use timely::dataflow::channels::pact::Pipeline;
224        self.inner
225            .enter(child)
226            .unary(Pipeline, "Enter", move |_,_| move |input, output| {
227                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
228            })
229            .as_collection()
230    }
231
232    /// Advances a timestamp in the stream according to the timestamp actions on the path.
233    ///
234    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
235    /// incrementing fields would result in integer overflow. In this case, the record is dropped.
236    ///
237    /// # Examples
238    /// ```
239    /// use timely::dataflow::Scope;
240    /// use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::BranchWhen};
241    ///
242    /// use differential_dataflow::input::Input;
243    ///
244    /// timely::example(|scope| {
245    ///     let summary1 = 5;
246    ///
247    ///     let data = scope.new_collection_from(1 .. 10).1;
248    ///     /// Applies `results_in` on every timestamp in the collection.
249    ///     data.results_in(summary1);
250    /// });
251    /// ```
252    pub fn results_in(self, step: <G::Timestamp as Timestamp>::Summary) -> Self
253    where
254        C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
255    {
256        use timely::dataflow::channels::pact::Pipeline;
257        self.inner
258            .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
259                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
260            })
261            .as_collection()
262    }
263}
264
265use timely::dataflow::scopes::ScopeParent;
266use timely::progress::timestamp::Refines;
267
268/// Methods requiring a nested scope.
269impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
270where
271    C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
272    T: Refines<<G as ScopeParent>::Timestamp>,
273{
274    /// Returns the final value of a Collection from a nested scope to its containing scope.
275    ///
276    /// # Examples
277    ///
278    /// ```
279    /// use timely::dataflow::Scope;
280    /// use differential_dataflow::input::Input;
281    ///
282    /// ::timely::example(|scope| {
283    ///
284    ///    let data = scope.new_collection_from(1 .. 10).1;
285    ///
286    ///    let result = scope.region(|child| {
287    ///         data.clone()
288    ///             .enter(child)
289    ///             .leave()
290    ///     });
291    ///
292    ///     data.assert_eq(result);
293    /// });
294    /// ```
295    pub fn leave(self) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
296        use timely::dataflow::channels::pact::Pipeline;
297        self.inner
298            .leave()
299            .unary(Pipeline, "Leave", move |_,_| move |input, output| {
300                input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
301            })
302            .as_collection()
303    }
304}
305
306/// Methods requiring a region as the scope.
307impl<G: Scope, C: Container+Clone+'static> Collection<Child<'_, G, G::Timestamp>, C>
308{
309    /// Returns the value of a Collection from a nested region to its containing scope.
310    ///
311    /// This method is a specialization of `leave` to the case that of a nested region.
312    /// It removes the need for an operator that adjusts the timestamp.
313    pub fn leave_region(self) -> Collection<G, C> {
314        self.inner
315            .leave()
316            .as_collection()
317    }
318}
319
320pub use vec::Collection as VecCollection;
321/// Specializations of `Collection` that use `Vec` as the container.
322pub mod vec {
323
324    use std::hash::Hash;
325
326    use timely::progress::Timestamp;
327    use timely::order::Product;
328    use timely::dataflow::scopes::child::Iterative;
329    use timely::dataflow::{Scope, ScopeParent};
330    use timely::dataflow::operators::*;
331    use timely::dataflow::operators::vec::*;
332
333    use crate::collection::AsCollection;
334    use crate::difference::{Semigroup, Abelian, Multiply};
335    use crate::lattice::Lattice;
336    use crate::hashable::Hashable;
337
338    /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
339    ///
340    /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
341    /// differential dataflow computation, you write as if the collection is a static dataset to which you
342    /// apply functional transformations, creating new collections. Once your computation is written, you
343    /// are able to mutate the collection (by inserting and removing elements); differential dataflow will
344    /// propagate changes through your functional computation and report the corresponding changes to the
345    /// output collections.
346    ///
347    /// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the
348    /// collection exists; as you write more complicated programs you may wish to introduce nested scopes
349    /// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
350    /// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
351    /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
352    /// defaults to) `isize`, representing changes to the occurrence count of each record.
353    ///
354    /// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`.
355    pub type Collection<G, D, R = isize> = super::Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
356
357
358    impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
359        /// Creates a new collection by applying the supplied function to each input element.
360        ///
361        /// # Examples
362        ///
363        /// ```
364        /// use differential_dataflow::input::Input;
365        ///
366        /// ::timely::example(|scope| {
367        ///     scope.new_collection_from(1 .. 10).1
368        ///          .map(|x| x * 2)
369        ///          .filter(|x| x % 2 == 1)
370        ///          .assert_empty();
371        /// });
372        /// ```
373        pub fn map<D2, L>(self, mut logic: L) -> Collection<G, D2, R>
374        where
375            D2: Clone+'static,
376            L: FnMut(D) -> D2 + 'static,
377        {
378            self.inner
379                .map(move |(data, time, delta)| (logic(data), time, delta))
380                .as_collection()
381        }
382        /// Creates a new collection by applying the supplied function to each input element.
383        ///
384        /// Although the name suggests in-place mutation, this function does not change the source collection,
385        /// but rather re-uses the underlying allocations in its implementation. The method is semantically
386        /// equivalent to `map`, but can be more efficient.
387        ///
388        /// # Examples
389        ///
390        /// ```
391        /// use differential_dataflow::input::Input;
392        ///
393        /// ::timely::example(|scope| {
394        ///     scope.new_collection_from(1 .. 10).1
395        ///          .map_in_place(|x| *x *= 2)
396        ///          .filter(|x| x % 2 == 1)
397        ///          .assert_empty();
398        /// });
399        /// ```
400        pub fn map_in_place<L>(self, mut logic: L) -> Collection<G, D, R>
401        where
402            L: FnMut(&mut D) + 'static,
403        {
404            self.inner
405                .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
406                .as_collection()
407        }
408        /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
409        ///
410        /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
411        /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
412        /// attempting to consolidate the results.
413        ///
414        /// # Examples
415        ///
416        /// ```
417        /// use differential_dataflow::input::Input;
418        ///
419        /// ::timely::example(|scope| {
420        ///     scope.new_collection_from(1 .. 10).1
421        ///          .flat_map(|x| 0 .. x);
422        /// });
423        /// ```
424        pub fn flat_map<I, L>(self, mut logic: L) -> Collection<G, I::Item, R>
425        where
426            G::Timestamp: Clone,
427            I: IntoIterator<Item: Clone+'static>,
428            L: FnMut(D) -> I + 'static,
429        {
430            self.inner
431                .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
432                .as_collection()
433        }
434        /// Creates a new collection containing those input records satisfying the supplied predicate.
435        ///
436        /// # Examples
437        ///
438        /// ```
439        /// use differential_dataflow::input::Input;
440        ///
441        /// ::timely::example(|scope| {
442        ///     scope.new_collection_from(1 .. 10).1
443        ///          .map(|x| x * 2)
444        ///          .filter(|x| x % 2 == 1)
445        ///          .assert_empty();
446        /// });
447        /// ```
448        pub fn filter<L>(self, mut logic: L) -> Collection<G, D, R>
449        where
450            L: FnMut(&D) -> bool + 'static,
451        {
452            self.inner
453                .filter(move |(data, _, _)| logic(data))
454                .as_collection()
455        }
456        /// Replaces each record with another, with a new difference type.
457        ///
458        /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
459        /// and move the data into the difference component. This will allow differential dataflow to update in-place.
460        ///
461        /// # Examples
462        ///
463        /// ```
464        /// use differential_dataflow::input::Input;
465        ///
466        /// ::timely::example(|scope| {
467        ///
468        ///     let nums = scope.new_collection_from(0 .. 10).1;
469        ///     let x1 = nums.clone().flat_map(|x| 0 .. x);
470        ///     let x2 = nums.map(|x| (x, 9 - x))
471        ///                  .explode(|(x,y)| Some((x,y)));
472        ///
473        ///     x1.assert_eq(x2);
474        /// });
475        /// ```
476        pub fn explode<D2, R2, I, L>(self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
477        where
478            D2: Clone+'static,
479            R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
480            I: IntoIterator<Item=(D2,R2)>,
481            L: FnMut(D)->I+'static,
482        {
483            self.inner
484                .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
485                .as_collection()
486        }
487
488        /// Joins each record against a collection defined by the function `logic`.
489        ///
490        /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
491        /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
492        /// modifications made to the results, namely joining timestamps and multiplying differences.
493        ///
494        /// #Examples
495        ///
496        /// ```
497        /// use differential_dataflow::input::Input;
498        ///
499        /// ::timely::example(|scope| {
500        ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
501        ///     // for x from 0 through 9.
502        ///     scope.new_collection_from(0 .. 10isize).1
503        ///          .join_function(|x|
504        ///              //   data      time      diff
505        ///              vec![(2*x, (3*x) as u64,  x),
506        ///                   (2*x, (4*x) as u64, -x)]
507        ///           );
508        /// });
509        /// ```
510        pub fn join_function<D2, R2, I, L>(self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
511        where
512            G::Timestamp: Lattice,
513            D2: Clone+'static,
514            R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
515            I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
516            L: FnMut(D)->I+'static,
517        {
518            self.inner
519                .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
520                .as_collection()
521        }
522
523        /// Brings a Collection into a nested scope, at varying times.
524        ///
525        /// The `initial` function indicates the time at which each element of the Collection should appear.
526        ///
527        /// # Examples
528        ///
529        /// ```
530        /// use timely::dataflow::Scope;
531        /// use differential_dataflow::input::Input;
532        ///
533        /// ::timely::example(|scope| {
534        ///
535        ///     let data = scope.new_collection_from(1 .. 10).1;
536        ///
537        ///     let result = scope.iterative::<u64,_,_>(|child| {
538        ///         data.clone()
539        ///             .enter_at(child, |x| *x)
540        ///             .leave()
541        ///     });
542        ///
543        ///     data.assert_eq(result);
544        /// });
545        /// ```
546        pub fn enter_at<'a, T, F>(self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
547        where
548            T: Timestamp+Hash,
549            F: FnMut(&D) -> T + Clone + 'static,
550            G::Timestamp: Hash,
551        {
552            self.inner
553                .enter(child)
554                .map(move |(data, time, diff)| {
555                    let new_time = Product::new(time, initial(&data));
556                    (data, new_time, diff)
557                })
558                .as_collection()
559        }
560
561        /// Delays each difference by a supplied function.
562        ///
563        /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
564        /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
565        /// ordered, they should have the same order or compare equal once `func` is applied to them (this
566        /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
567        /// to all of the data timestamps).
568        pub fn delay<F>(self, func: F) -> Collection<G, D, R>
569        where
570            G::Timestamp: Hash,
571            F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
572        {
573            let mut func1 = func.clone();
574            let mut func2 = func.clone();
575
576            self.inner
577                .delay_batch(move |x| func1(x))
578                .map_in_place(move |x| x.1 = func2(&x.1))
579                .as_collection()
580        }
581
582        /// Applies a supplied function to each update.
583        ///
584        /// This method is most commonly used to report information back to the user, often for debugging purposes.
585        /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
586        /// not guarantee that it will be called as many times as you might expect.
587        ///
588        /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
589        /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
590        /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
591        /// interesting and less intuitive, unfortunately.
592        ///
593        /// # Examples
594        ///
595        /// ```
596        /// use differential_dataflow::input::Input;
597        ///
598        /// ::timely::example(|scope| {
599        ///     scope.new_collection_from(1 .. 10).1
600        ///          .map_in_place(|x| *x *= 2)
601        ///          .filter(|x| x % 2 == 1)
602        ///          .inspect(|x| println!("error: {:?}", x));
603        /// });
604        /// ```
605        pub fn inspect<F>(self, func: F) -> Collection<G, D, R>
606        where
607            F: FnMut(&(D, G::Timestamp, R))+'static,
608        {
609            self.inner
610                .inspect(func)
611                .as_collection()
612        }
613        /// Applies a supplied function to each batch of updates.
614        ///
615        /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
616        /// timely dataflow capability associated with the batch of updates. The observed batching depends
617        /// on how the system executes, and may vary run to run.
618        ///
619        /// # Examples
620        ///
621        /// ```
622        /// use differential_dataflow::input::Input;
623        ///
624        /// ::timely::example(|scope| {
625        ///     scope.new_collection_from(1 .. 10).1
626        ///          .map_in_place(|x| *x *= 2)
627        ///          .filter(|x| x % 2 == 1)
628        ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
629        /// });
630        /// ```
631        pub fn inspect_batch<F>(self, mut func: F) -> Collection<G, D, R>
632        where
633            F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
634        {
635            self.inner
636                .inspect_batch(move |time, data| func(time, data))
637                .as_collection()
638        }
639
640        /// Assert if the collection is ever non-empty.
641        ///
642        /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
643        /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
644        /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
645        /// program should indicate that this assertion never found cause to complain.
646        ///
647        /// # Examples
648        ///
649        /// ```
650        /// use differential_dataflow::input::Input;
651        ///
652        /// ::timely::example(|scope| {
653        ///     scope.new_collection_from(1 .. 10).1
654        ///          .map(|x| x * 2)
655        ///          .filter(|x| x % 2 == 1)
656        ///          .assert_empty();
657        /// });
658        /// ```
659        pub fn assert_empty(self)
660        where
661            D: crate::ExchangeData+Hashable,
662            R: crate::ExchangeData+Hashable + Semigroup,
663            G::Timestamp: Lattice+Ord,
664        {
665            self.consolidate()
666                .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
667        }
668    }
669
670    /// Methods requiring an Abelian difference, to support negation.
671    impl<G: Scope<Timestamp: Clone+'static>, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> {
672        /// Assert if the collections are ever different.
673        ///
674        /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
675        /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
676        /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
677        /// indicate that this assertion never found cause to complain.
678        ///
679        /// # Examples
680        ///
681        /// ```
682        /// use differential_dataflow::input::Input;
683        ///
684        /// ::timely::example(|scope| {
685        ///
686        ///     let data = scope.new_collection_from(1 .. 10).1;
687        ///
688        ///     let odds = data.clone().filter(|x| x % 2 == 1);
689        ///     let evens = data.clone().filter(|x| x % 2 == 0);
690        ///
691        ///     odds.concat(evens)
692        ///         .assert_eq(data);
693        /// });
694        /// ```
695        pub fn assert_eq(self, other: Self)
696        where
697            D: crate::ExchangeData+Hashable,
698            R: crate::ExchangeData+Hashable,
699            G::Timestamp: Lattice+Ord,
700        {
701            self.negate()
702                .concat(other)
703                .assert_empty();
704        }
705    }
706
707    use crate::trace::{Trace, Builder};
708    use crate::operators::arrange::{Arranged, TraceAgent};
709
710    impl <G, K, V, R> Collection<G, (K, V), R>
711    where
712        G: Scope<Timestamp: Lattice+Ord>,
713        K: crate::ExchangeData+Hashable,
714        V: crate::ExchangeData,
715        R: crate::ExchangeData+Semigroup,
716    {
717        /// Applies a reduction function on records grouped by key.
718        ///
719        /// Input data must be structured as `(key, val)` pairs.
720        /// The user-supplied reduction function takes as arguments
721        ///
722        /// 1. a reference to the key,
723        /// 2. a reference to the slice of values and their accumulated updates,
724        /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
725        ///
726        /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
727        /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
728        /// `Ord` implementations.
729        ///
730        /// # Examples
731        ///
732        /// ```
733        /// use differential_dataflow::input::Input;
734        ///
735        /// ::timely::example(|scope| {
736        ///     // report the smallest value for each group
737        ///     scope.new_collection_from(1 .. 10).1
738        ///          .map(|x| (x / 3, x))
739        ///          .reduce(|_key, input, output| {
740        ///              output.push((*input[0].0, 1))
741        ///          });
742        /// });
743        /// ```
744        pub fn reduce<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, logic: L) -> Collection<G, (K, V2), R2>
745        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
746            self.reduce_named("Reduce", logic)
747        }
748
749        /// As `reduce` with the ability to name the operator.
750        pub fn reduce_named<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
751        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
752            use crate::trace::implementations::{ValBuilder, ValSpine};
753
754            self.arrange_by_key_named(&format!("Arrange: {}", name))
755                .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
756                .as_collection(|k,v| (k.clone(), v.clone()))
757        }
758
759        /// Applies `reduce` to arranged data, and returns an arrangement of output data.
760        ///
761        /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
762        /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
763        ///
764        /// # Examples
765        ///
766        /// ```
767        /// use differential_dataflow::input::Input;
768        /// use differential_dataflow::trace::Trace;
769        /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
770        ///
771        /// ::timely::example(|scope| {
772        ///
773        ///     let trace =
774        ///     scope.new_collection_from(1 .. 10u32).1
775        ///          .map(|x| (x, x))
776        ///          .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
777        ///             "Example",
778        ///              move |_key, src, dst| dst.push((*src[0].0, 1))
779        ///          )
780        ///          .trace;
781        /// });
782        /// ```
783        pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
784        where
785            T2: for<'a> Trace<Key<'a>= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static,
786            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
787            L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
788        {
789            self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
790                if !input.is_empty() { logic(key, input, change); }
791                change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
792                crate::consolidation::consolidate(change);
793            })
794        }
795
796        /// Solves for output updates when presented with inputs and would-be outputs.
797        ///
798        /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
799        /// and it may not be safe to index into the first element.
800        /// At least one of the two collections will be non-empty.
801        pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
802        where
803            V: Clone+'static,
804            T2: for<'a> Trace<Key<'a>=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static,
805            Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
806            L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
807        {
808            self.arrange_by_key_named(&format!("Arrange: {}", name))
809                .reduce_core::<_,Bu,_>(name, logic)
810        }
811    }
812
813    impl<G, K, R1> Collection<G, K, R1>
814    where
815        G: Scope<Timestamp: Lattice+Ord>,
816        K: crate::ExchangeData+Hashable,
817        R1: crate::ExchangeData+Semigroup
818    {
819
820        /// Reduces the collection to one occurrence of each distinct element.
821        ///
822        /// # Examples
823        ///
824        /// ```
825        /// use differential_dataflow::input::Input;
826        ///
827        /// ::timely::example(|scope| {
828        ///     // report at most one of each key.
829        ///     scope.new_collection_from(1 .. 10).1
830        ///          .map(|x| x / 3)
831        ///          .distinct();
832        /// });
833        /// ```
834        pub fn distinct(self) -> Collection<G, K, isize> {
835            self.distinct_core()
836        }
837
838        /// Distinct for general integer differences.
839        ///
840        /// This method allows `distinct` to produce collections whose difference
841        /// type is something other than an `isize` integer, for example perhaps an
842        /// `i32`.
843        pub fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(self) -> Collection<G, K, R2> {
844            self.threshold_named("Distinct", |_,_| R2::from(1i8))
845        }
846
847        /// Transforms the multiplicity of records.
848        ///
849        /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
850        /// least the computation may behave as if it does. Otherwise, the transformation
851        /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
852        ///
853        /// # Examples
854        ///
855        /// ```
856        /// use differential_dataflow::input::Input;
857        ///
858        /// ::timely::example(|scope| {
859        ///     // report at most one of each key.
860        ///     scope.new_collection_from(1 .. 10).1
861        ///          .map(|x| x / 3)
862        ///          .threshold(|_,c| c % 2);
863        /// });
864        /// ```
865        pub fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(self, thresh: F) -> Collection<G, K, R2> {
866            self.threshold_named("Threshold", thresh)
867        }
868
869        /// A `threshold` with the ability to name the operator.
870        pub fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
871            use crate::trace::implementations::{KeyBuilder, KeySpine};
872
873            self.arrange_by_self_named(&format!("Arrange: {}", name))
874                .reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
875                .as_collection(|k,_| k.clone())
876        }
877
878    }
879
880    impl<G, K, R> Collection<G, K, R>
881    where
882        G: Scope<Timestamp: Lattice+Ord>,
883        K: crate::ExchangeData+Hashable,
884        R: crate::ExchangeData+Semigroup
885    {
886
887        /// Counts the number of occurrences of each element.
888        ///
889        /// # Examples
890        ///
891        /// ```
892        /// use differential_dataflow::input::Input;
893        ///
894        /// ::timely::example(|scope| {
895        ///     // report the number of occurrences of each key
896        ///     scope.new_collection_from(1 .. 10).1
897        ///          .map(|x| x / 3)
898        ///          .count();
899        /// });
900        /// ```
901        pub fn count(self) -> Collection<G, (K, R), isize> { self.count_core() }
902
903        /// Count for general integer differences.
904        ///
905        /// This method allows `count` to produce collections whose difference
906        /// type is something other than an `isize` integer, for example perhaps an
907        /// `i32`.
908        pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<G, (K, R), R2> {
909            use crate::trace::implementations::{ValBuilder, ValSpine};
910            self.arrange_by_self_named("Arrange: Count")
911                .reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
912                .as_collection(|k,c| (k.clone(), c.clone()))
913        }
914    }
915
916    /// Methods which require data be arrangeable.
917    impl<G, D, R> Collection<G, D, R>
918    where
919        G: Scope<Timestamp: Clone+'static+Lattice>,
920        D: crate::ExchangeData+Hashable,
921        R: crate::ExchangeData+Semigroup,
922    {
923        /// Aggregates the weights of equal records into at most one record.
924        ///
925        /// This method uses the type `D`'s `hashed()` method to partition the data. The data are
926        /// accumulated in place, each held back until their timestamp has completed.
927        ///
928        /// # Examples
929        ///
930        /// ```
931        /// use differential_dataflow::input::Input;
932        ///
933        /// ::timely::example(|scope| {
934        ///
935        ///     let x = scope.new_collection_from(1 .. 10u32).1;
936        ///
937        ///     x.clone()
938        ///      .negate()
939        ///      .concat(x)
940        ///      .consolidate() // <-- ensures cancellation occurs
941        ///      .assert_empty();
942        /// });
943        /// ```
944        pub fn consolidate(self) -> Self {
945            use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
946            self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
947        }
948
949        /// As `consolidate` but with the ability to name the operator, specify the trace type,
950        /// and provide the function `reify` to produce owned keys and values..
951        pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
952        where
953            Ba: crate::trace::Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
954            Tr: for<'a> crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
955            Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
956            F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
957        {
958            use crate::operators::arrange::arrangement::Arrange;
959            self.map(|k| (k, ()))
960                .arrange_named::<Ba, Bu, Tr>(name)
961                .as_collection(reify)
962        }
963
964        /// Aggregates the weights of equal records.
965        ///
966        /// Unlike `consolidate`, this method does not exchange data and does not
967        /// ensure that at most one copy of each `(data, time)` pair exists in the
968        /// results. Instead, it acts on each batch of data and collapses equivalent
969        /// `(data, time)` pairs found therein, suppressing any that accumulate to
970        /// zero.
971        ///
972        /// # Examples
973        ///
974        /// ```
975        /// use differential_dataflow::input::Input;
976        ///
977        /// ::timely::example(|scope| {
978        ///
979        ///     let x = scope.new_collection_from(1 .. 10u32).1;
980        ///
981        ///     // nothing to assert, as no particular guarantees.
982        ///     x.clone()
983        ///      .negate()
984        ///      .concat(x)
985        ///      .consolidate_stream();
986        /// });
987        /// ```
988        pub fn consolidate_stream(self) -> Self {
989
990            use timely::dataflow::channels::pact::Pipeline;
991            use timely::dataflow::operators::Operator;
992            use crate::collection::AsCollection;
993            use crate::consolidation::ConsolidatingContainerBuilder;
994
995            self.inner
996                .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {
997
998                    move |input, output| {
999                        input.for_each(|time, data| {
1000                            output.session_with_builder(&time).give_iterator(data.drain(..));
1001                        })
1002                    }
1003                })
1004                .as_collection()
1005        }
1006    }
1007
1008    use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
1009    use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
1010    use crate::operators::arrange::Arrange;
1011
1012    impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
1013    where
1014        G: Scope<Timestamp: Lattice>,
1015        K: crate::ExchangeData + Hashable,
1016        V: crate::ExchangeData,
1017        R: crate::ExchangeData + Semigroup,
1018    {
1019        fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
1020        where
1021            Ba: crate::trace::Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
1022            Bu: crate::trace::Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
1023            Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
1024        {
1025            let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
1026            crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name)
1027        }
1028    }
1029
1030    impl<G, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
1031    where
1032        G: Scope<Timestamp: Lattice+Ord>,
1033    {
1034        fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
1035        where
1036            Ba: crate::trace::Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
1037            Bu: crate::trace::Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
1038            Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
1039        {
1040            let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
1041            crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
1042        }
1043    }
1044
1045
1046    impl<G, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<G, (K,V), R>
1047    where
1048        G: Scope<Timestamp: Lattice+Ord>,
1049    {
1050        /// Arranges a collection of `(Key, Val)` records by `Key`.
1051        ///
1052        /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
1053        /// This trace is current for all times completed by the output stream, which can be used to
1054        /// safely identify the stable times and values in the trace.
1055        pub fn arrange_by_key(self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
1056            self.arrange_by_key_named("ArrangeByKey")
1057        }
1058
1059        /// As `arrange_by_key` but with the ability to name the arrangement.
1060        pub fn arrange_by_key_named(self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
1061            self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
1062        }
1063    }
1064
1065    impl<G, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<G, K, R>
1066    where
1067        G: Scope<Timestamp: Lattice+Ord>,
1068    {
1069        /// Arranges a collection of `Key` records by `Key`.
1070        ///
1071        /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
1072        /// This trace is current for all times complete in the output stream, which can be used to safely
1073        /// identify the stable times and values in the trace.
1074        pub fn arrange_by_self(self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
1075            self.arrange_by_self_named("ArrangeBySelf")
1076        }
1077
1078        /// As `arrange_by_self` but with the ability to name the arrangement.
1079        pub fn arrange_by_self_named(self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
1080            self.map(|k| (k, ()))
1081                .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
1082        }
1083    }
1084
1085    impl<G, K, V, R> Collection<G, (K, V), R>
1086    where
1087        G: Scope<Timestamp: Lattice+Ord>,
1088        K: crate::ExchangeData+Hashable,
1089        V: crate::ExchangeData,
1090        R: crate::ExchangeData+Semigroup,
1091    {
1092        /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
1093        ///
1094        /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
1095        ///
1096        /// # Examples
1097        ///
1098        /// ```
1099        /// use differential_dataflow::input::Input;
1100        ///
1101        /// ::timely::example(|scope| {
1102        ///
1103        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1104        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1105        ///     let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
1106        ///
1107        ///     x.join(y)
1108        ///      .assert_eq(z);
1109        /// });
1110        /// ```
1111        pub fn join<V2, R2>(self, other: Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
1112        where
1113            K:  crate::ExchangeData,
1114            V2: crate::ExchangeData,
1115            R2: crate::ExchangeData+Semigroup,
1116            R: Multiply<R2, Output: Semigroup+'static>,
1117        {
1118            self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
1119        }
1120
1121        /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
1122        ///
1123        /// # Examples
1124        ///
1125        /// ```
1126        /// use differential_dataflow::input::Input;
1127        ///
1128        /// ::timely::example(|scope| {
1129        ///
1130        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1131        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1132        ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1133        ///
1134        ///     x.join_map(y, |_key, &a, &b| (a,b))
1135        ///      .assert_eq(z);
1136        /// });
1137        /// ```
1138        pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(self, other: Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
1139        where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
1140            let arranged1 = self.arrange_by_key();
1141            let arranged2 = other.arrange_by_key();
1142            arranged1.join_core(arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
1143        }
1144
1145        /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
1146        ///
1147        /// When the second collection contains frequencies that are either zero or one this is the more traditional
1148        /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
1149        /// the counts of the records in the first input.
1150        ///
1151        /// # Examples
1152        ///
1153        /// ```
1154        /// use differential_dataflow::input::Input;
1155        ///
1156        /// ::timely::example(|scope| {
1157        ///
1158        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1159        ///     let y = scope.new_collection_from(vec![0, 2]).1;
1160        ///     let z = scope.new_collection_from(vec![(0, 1)]).1;
1161        ///
1162        ///     x.semijoin(y)
1163        ///      .assert_eq(z);
1164        /// });
1165        /// ```
1166        pub fn semijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
1167        where R: Multiply<R2, Output: Semigroup+'static> {
1168            let arranged1 = self.arrange_by_key();
1169            let arranged2 = other.arrange_by_self();
1170            arranged1.join_core(arranged2, |k,v,_| Some((k.clone(), v.clone())))
1171        }
1172
1173        /// Subtracts the semijoin with `other` from `self`.
1174        ///
1175        /// In the case that `other` has multiplicities zero or one this results
1176        /// in a relational antijoin, in which we discard input records whose key
1177        /// is present in `other`. If the multiplicities could be other than zero
1178        /// or one, the semantic interpretation of this operator is less clear.
1179        ///
1180        /// In almost all cases, you should ensure that `other` has multiplicities
1181        /// that are zero or one, perhaps by using the `distinct` operator.
1182        ///
1183        /// # Examples
1184        ///
1185        /// ```
1186        /// use differential_dataflow::input::Input;
1187        ///
1188        /// ::timely::example(|scope| {
1189        ///
1190        ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1191        ///     let y = scope.new_collection_from(vec![0, 2]).1;
1192        ///     let z = scope.new_collection_from(vec![(1, 3)]).1;
1193        ///
1194        ///     x.antijoin(y)
1195        ///      .assert_eq(z);
1196        /// });
1197        /// ```
1198        pub fn antijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<G, K, R2>) -> Collection<G, (K, V), R>
1199        where R: Multiply<R2, Output=R>, R: Abelian+'static {
1200            self.clone().concat(self.semijoin(other).negate())
1201        }
1202
1203        /// Joins two arranged collections with the same key type.
1204        ///
1205        /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
1206        /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
1207        /// every value returned by the iterator.
1208        ///
1209        /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
1210        /// contains the implementations for collections.
1211        ///
1212        /// # Examples
1213        ///
1214        /// ```
1215        /// use differential_dataflow::input::Input;
1216        /// use differential_dataflow::trace::Trace;
1217        ///
1218        /// ::timely::example(|scope| {
1219        ///
1220        ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
1221        ///                  .arrange_by_key();
1222        ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
1223        ///                  .arrange_by_key();
1224        ///
1225        ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1226        ///
1227        ///     x.join_core(y, |_key, &a, &b| Some((a, b)))
1228        ///      .assert_eq(z);
1229        /// });
1230        /// ```
1231        pub fn join_core<Tr2,I,L> (self, stream2: Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
1232        where
1233            Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
1234            R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
1235            I: IntoIterator<Item: crate::Data>,
1236            L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
1237        {
1238            self.arrange_by_key()
1239                .join_core(stream2, result)
1240        }
1241    }
1242}
1243
1244/// Conversion to a differential dataflow Collection.
1245pub trait AsCollection<G: Scope, C> {
1246    /// Converts the type to a differential dataflow collection.
1247    fn as_collection(self) -> Collection<G, C>;
1248}
1249
1250impl<G: Scope, C> AsCollection<G, C> for Stream<G, C> {
1251    /// Converts the type to a differential dataflow collection.
1252    ///
1253    /// By calling this method, you guarantee that the timestamp invariant (as documented on
1254    /// [Collection]) is upheld. This method will not check it.
1255    fn as_collection(self) -> Collection<G, C> {
1256        Collection::<G,C>::new(self)
1257    }
1258}
1259
1260/// Concatenates multiple collections.
1261///
1262/// This method has the effect of a sequence of calls to `concat`, but it does
1263/// so in one operator rather than a chain of many operators.
1264///
1265/// # Examples
1266///
1267/// ```
1268/// use differential_dataflow::input::Input;
1269///
1270/// ::timely::example(|scope| {
1271///
1272///     let data = scope.new_collection_from(1 .. 10).1;
1273///
1274///     let odds = data.clone().filter(|x| x % 2 == 1);
1275///     let evens = data.clone().filter(|x| x % 2 == 0);
1276///
1277///     differential_dataflow::collection::concatenate(scope, vec![odds, evens])
1278///         .assert_eq(data);
1279/// });
1280/// ```
1281pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
1282where
1283    G: Scope,
1284    C: Container,
1285    I: IntoIterator<Item=Collection<G, C>>,
1286{
1287    scope
1288        .concatenate(iterator.into_iter().map(|x| x.inner))
1289        .as_collection()
1290}
1291
1292/// Traits that can be implemented by containers to provide functionality to collections based on them.
1293pub mod containers {
1294
1295    /// A container that can negate its updates.
1296    pub trait Negate {
1297        /// Negates Abelian differences of each update.
1298        fn negate(self) -> Self;
1299    }
1300
1301    /// A container that can enter from timestamp `T1` into timestamp `T2`.
1302    pub trait Enter<T1, T2> {
1303        /// The resulting container type.
1304        type InnerContainer;
1305        /// Update timestamps from `T1` to `T2`.
1306        fn enter(self) -> Self::InnerContainer;
1307    }
1308
1309    /// A container that can leave from timestamp `T1` into timestamp `T2`.
1310    pub trait Leave<T1, T2> {
1311        /// The resulting container type.
1312        type OuterContainer;
1313        /// Update timestamps from `T1` to `T2`.
1314        fn leave(self) -> Self::OuterContainer;
1315    }
1316
1317    /// A container that can advance timestamps by a summary `TS`.
1318    pub trait ResultsIn<TS> {
1319        /// Advance times in the container by `step`.
1320        fn results_in(self, step: &TS) -> Self;
1321    }
1322
1323
1324    /// Implementations of container traits for the `Vec` container.
1325    mod vec {
1326
1327        use timely::progress::{Timestamp, timestamp::Refines};
1328        use crate::collection::Abelian;
1329
1330        use super::{Negate, Enter, Leave, ResultsIn};
1331
1332        impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
1333            fn negate(mut self) -> Self {
1334                for (_data, _time, diff) in self.iter_mut() { diff.negate(); }
1335                self
1336            }
1337        }
1338
1339        impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
1340            type InnerContainer = Vec<(D, T2, R)>;
1341            fn enter(self) -> Self::InnerContainer {
1342                self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
1343            }
1344        }
1345
1346        impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
1347            type OuterContainer = Vec<(D, T2, R)>;
1348            fn leave(self) -> Self::OuterContainer {
1349                self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
1350            }
1351        }
1352
1353        impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
1354            fn results_in(self, step: &T::Summary) -> Self {
1355                use timely::progress::PathSummary;
1356                self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
1357            }
1358        }
1359    }
1360
1361    /// Implementations of container traits for the `Rc` container.
1362    mod rc {
1363        use std::rc::Rc;
1364
1365        use timely::progress::{Timestamp, timestamp::Refines};
1366
1367        use super::{Negate, Enter, Leave, ResultsIn};
1368
1369        impl<C: Negate+Clone+Default> Negate for Rc<C> {
1370            fn negate(mut self) -> Self {
1371                std::mem::take(Rc::make_mut(&mut self)).negate().into()
1372            }
1373        }
1374
1375        impl<C: Enter<T1, T2>+Clone+Default, T1: Timestamp, T2: Refines<T1>> Enter<T1, T2> for Rc<C> {
1376            type InnerContainer = Rc<C::InnerContainer>;
1377            fn enter(mut self) -> Self::InnerContainer {
1378                std::mem::take(Rc::make_mut(&mut self)).enter().into()
1379            }
1380        }
1381
1382        impl<C: Leave<T1, T2>+Clone+Default, T1: Refines<T2>, T2: Timestamp> Leave<T1, T2> for Rc<C> {
1383            type OuterContainer = Rc<C::OuterContainer>;
1384            fn leave(mut self) -> Self::OuterContainer {
1385                std::mem::take(Rc::make_mut(&mut self)).leave().into()
1386            }
1387        }
1388
1389        impl<C: ResultsIn<TS>+Clone+Default, TS> ResultsIn<TS> for Rc<C> {
1390            fn results_in(mut self, step: &TS) -> Self {
1391                std::mem::take(Rc::make_mut(&mut self)).results_in(step).into()
1392            }
1393        }
1394    }
1395}