Skip to main content

palimpsest_dataflow/
collection.rs

1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::dataflow::operators::*;
14use timely::dataflow::scopes::{child::Iterative, Child};
15use timely::dataflow::Scope;
16use timely::dataflow::StreamCore;
17use timely::order::Product;
18use timely::progress::Timestamp;
19use timely::{Container, Data};
20
21use crate::difference::{Abelian, Multiply, Semigroup};
22use crate::hashable::Hashable;
23use crate::lattice::Lattice;
24
25/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
26///
27/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
28/// differential dataflow computation, you write as if the collection is a static dataset to which you
29/// apply functional transformations, creating new collections. Once your computation is written, you
30/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
31/// propagate changes through your functional computation and report the corresponding changes to the
32/// output collections.
33///
34/// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the
35/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
36/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
37/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
38/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
39/// defaults to) `isize`, representing changes to the occurrence count of each record.
40///
41/// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`.
42pub type VecCollection<G, D, R = isize> = Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
43
44/// An evolving collection represented by a stream of abstract containers.
45///
46/// The containers purport to reperesent changes to a collection, and they must implement various traits
47/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
48/// on the containers, and streams of containers, are left to the container implementor to describe.
49#[derive(Clone)]
50pub struct Collection<G: Scope, C> {
51    /// The underlying timely dataflow stream.
52    ///
53    /// This field is exposed to support direct timely dataflow manipulation when required, but it is
54    /// not intended to be the idiomatic way to work with the collection.
55    ///
56    /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
57    /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
58    /// unexpectedly.
59    pub inner: timely::dataflow::StreamCore<G, C>,
60}
61
62impl<G: Scope, C> Collection<G, C> {
63    /// Creates a new Collection from a timely dataflow stream.
64    ///
65    /// This method seems to be rarely used, with the `as_collection` method on streams being a more
66    /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
67    /// provides a `new_collection` method which will create a new collection for you without exposing
68    /// the underlying timely stream at all.
69    ///
70    /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
71    /// method does not check it.
72    pub fn new(stream: StreamCore<G, C>) -> Self {
73        Self { inner: stream }
74    }
75}
76impl<G: Scope, C: Container> Collection<G, C> {
77    /// Creates a new collection accumulating the contents of the two collections.
78    ///
79    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
80    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
81    /// two collections.
82    ///
83    /// # Examples
84    ///
85    /// ```
86    /// use palimpsest_dataflow::input::Input;
87    ///
88    /// ::timely::example(|scope| {
89    ///
90    ///     let data = scope.new_collection_from(1 .. 10).1;
91    ///
92    ///     let odds = data.filter(|x| x % 2 == 1);
93    ///     let evens = data.filter(|x| x % 2 == 0);
94    ///
95    ///     odds.concat(&evens)
96    ///         .assert_eq(&data);
97    /// });
98    /// ```
99    pub fn concat(&self, other: &Self) -> Self {
100        self.inner.concat(&other.inner).as_collection()
101    }
102    /// Creates a new collection accumulating the contents of the two collections.
103    ///
104    /// Despite the name, differential dataflow collections are unordered. This method is so named because the
105    /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
106    /// two collections.
107    ///
108    /// # Examples
109    ///
110    /// ```
111    /// use palimpsest_dataflow::input::Input;
112    ///
113    /// ::timely::example(|scope| {
114    ///
115    ///     let data = scope.new_collection_from(1 .. 10).1;
116    ///
117    ///     let odds = data.filter(|x| x % 2 == 1);
118    ///     let evens = data.filter(|x| x % 2 == 0);
119    ///
120    ///     odds.concatenate(Some(evens))
121    ///         .assert_eq(&data);
122    /// });
123    /// ```
124    pub fn concatenate<I>(&self, sources: I) -> Self
125    where
126        I: IntoIterator<Item = Self>,
127    {
128        self.inner
129            .concatenate(sources.into_iter().map(|x| x.inner))
130            .as_collection()
131    }
132    // Brings a Collection into a nested region.
133    ///
134    /// This method is a specialization of `enter` to the case where the nested scope is a region.
135    /// It removes the need for an operator that adjusts the timestamp.
136    pub fn enter_region<'a>(
137        &self,
138        child: &Child<'a, G, <G as ScopeParent>::Timestamp>,
139    ) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
140        self.inner.enter(child).as_collection()
141    }
142    /// Applies a supplied function to each batch of updates.
143    ///
144    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
145    /// timely dataflow capability associated with the batch of updates. The observed batching depends
146    /// on how the system executes, and may vary run to run.
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// use palimpsest_dataflow::input::Input;
152    ///
153    /// ::timely::example(|scope| {
154    ///     scope.new_collection_from(1 .. 10).1
155    ///          .map_in_place(|x| *x *= 2)
156    ///          .filter(|x| x % 2 == 1)
157    ///          .inspect_container(|event| println!("event: {:?}", event));
158    /// });
159    /// ```
160    pub fn inspect_container<F>(&self, func: F) -> Self
161    where
162        F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,
163    {
164        self.inner.inspect_container(func).as_collection()
165    }
166    /// Attaches a timely dataflow probe to the output of a Collection.
167    ///
168    /// This probe is used to determine when the state of the Collection has stabilized and can
169    /// be read out.
170    pub fn probe(&self) -> probe::Handle<G::Timestamp> {
171        self.inner.probe()
172    }
173    /// Attaches a timely dataflow probe to the output of a Collection.
174    ///
175    /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
176    /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
177    /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
178    /// avoid swamping the system.
179    pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
180        Self::new(self.inner.probe_with(handle))
181    }
182    /// The scope containing the underlying timely dataflow stream.
183    pub fn scope(&self) -> G {
184        self.inner.scope()
185    }
186
187    /// Creates a new collection whose counts are the negation of those in the input.
188    ///
189    /// This method is most commonly used with `concat` to get those element in one collection but not another.
190    /// However, differential dataflow computations are still defined for all values of the difference type `R`,
191    /// including negative counts.
192    ///
193    /// # Examples
194    ///
195    /// ```
196    /// use palimpsest_dataflow::input::Input;
197    ///
198    /// ::timely::example(|scope| {
199    ///
200    ///     let data = scope.new_collection_from(1 .. 10).1;
201    ///
202    ///     let odds = data.filter(|x| x % 2 == 1);
203    ///     let evens = data.filter(|x| x % 2 == 0);
204    ///
205    ///     odds.negate()
206    ///         .concat(&data)
207    ///         .assert_eq(&evens);
208    /// });
209    /// ```
210    pub fn negate(&self) -> Self
211    where
212        C: containers::Negate,
213    {
214        use timely::dataflow::channels::pact::Pipeline;
215        self.inner
216            .unary(Pipeline, "Negate", move |_, _| {
217                move |input, output| {
218                    input.for_each(|time, data| {
219                        output
220                            .session(&time)
221                            .give_container(&mut std::mem::take(data).negate())
222                    });
223                }
224            })
225            .as_collection()
226    }
227
228    /// Brings a Collection into a nested scope.
229    ///
230    /// # Examples
231    ///
232    /// ```
233    /// use timely::dataflow::Scope;
234    /// use palimpsest_dataflow::input::Input;
235    ///
236    /// ::timely::example(|scope| {
237    ///
238    ///     let data = scope.new_collection_from(1 .. 10).1;
239    ///
240    ///     let result = scope.region(|child| {
241    ///         data.enter(child)
242    ///             .leave()
243    ///     });
244    ///
245    ///     data.assert_eq(&result);
246    /// });
247    /// ```
248    pub fn enter<'a, T>(
249        &self,
250        child: &Child<'a, G, T>,
251    ) -> Collection<
252        Child<'a, G, T>,
253        <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer,
254    >
255    where
256        C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
257        T: Refines<<G as ScopeParent>::Timestamp>,
258    {
259        use timely::dataflow::channels::pact::Pipeline;
260        self.inner
261            .enter(child)
262            .unary(Pipeline, "Enter", move |_, _| {
263                move |input, output| {
264                    input.for_each(|time, data| {
265                        output
266                            .session(&time)
267                            .give_container(&mut std::mem::take(data).enter())
268                    });
269                }
270            })
271            .as_collection()
272    }
273
274    /// Advances a timestamp in the stream according to the timestamp actions on the path.
275    ///
276    /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
277    /// incrementing fields would result in integer overflow. In this case, the record is dropped.
278    ///
279    /// # Examples
280    /// ```
281    /// use timely::dataflow::Scope;
282    /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
283    ///
284    /// use palimpsest_dataflow::input::Input;
285    ///
286    /// timely::example(|scope| {
287    ///     let summary1 = 5;
288    ///
289    ///     let data = scope.new_collection_from(1 .. 10).1;
290    ///     /// Applies `results_in` on every timestamp in the collection.
291    ///     data.results_in(summary1);
292    /// });
293    /// ```
294    pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
295    where
296        C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
297    {
298        use timely::dataflow::channels::pact::Pipeline;
299        self.inner
300            .unary(Pipeline, "ResultsIn", move |_, _| {
301                move |input, output| {
302                    input.for_each(|time, data| {
303                        output
304                            .session(&time)
305                            .give_container(&mut std::mem::take(data).results_in(&step))
306                    });
307                }
308            })
309            .as_collection()
310    }
311}
312
313impl<G: Scope, D: Clone + 'static, R: Clone + 'static> VecCollection<G, D, R> {
314    /// Creates a new collection by applying the supplied function to each input element.
315    ///
316    /// # Examples
317    ///
318    /// ```
319    /// use palimpsest_dataflow::input::Input;
320    ///
321    /// ::timely::example(|scope| {
322    ///     scope.new_collection_from(1 .. 10).1
323    ///          .map(|x| x * 2)
324    ///          .filter(|x| x % 2 == 1)
325    ///          .assert_empty();
326    /// });
327    /// ```
328    pub fn map<D2, L>(&self, mut logic: L) -> VecCollection<G, D2, R>
329    where
330        D2: Data,
331        L: FnMut(D) -> D2 + 'static,
332    {
333        self.inner
334            .map(move |(data, time, delta)| (logic(data), time, delta))
335            .as_collection()
336    }
337    /// Creates a new collection by applying the supplied function to each input element.
338    ///
339    /// Although the name suggests in-place mutation, this function does not change the source collection,
340    /// but rather re-uses the underlying allocations in its implementation. The method is semantically
341    /// equivalent to `map`, but can be more efficient.
342    ///
343    /// # Examples
344    ///
345    /// ```
346    /// use palimpsest_dataflow::input::Input;
347    ///
348    /// ::timely::example(|scope| {
349    ///     scope.new_collection_from(1 .. 10).1
350    ///          .map_in_place(|x| *x *= 2)
351    ///          .filter(|x| x % 2 == 1)
352    ///          .assert_empty();
353    /// });
354    /// ```
355    pub fn map_in_place<L>(&self, mut logic: L) -> VecCollection<G, D, R>
356    where
357        L: FnMut(&mut D) + 'static,
358    {
359        self.inner
360            .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
361            .as_collection()
362    }
363    /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
364    ///
365    /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
366    /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
367    /// attempting to consolidate the results.
368    ///
369    /// # Examples
370    ///
371    /// ```
372    /// use palimpsest_dataflow::input::Input;
373    ///
374    /// ::timely::example(|scope| {
375    ///     scope.new_collection_from(1 .. 10).1
376    ///          .flat_map(|x| 0 .. x);
377    /// });
378    /// ```
379    pub fn flat_map<I, L>(&self, mut logic: L) -> VecCollection<G, I::Item, R>
380    where
381        G::Timestamp: Clone,
382        I: IntoIterator<Item: Data>,
383        L: FnMut(D) -> I + 'static,
384    {
385        self.inner
386            .flat_map(move |(data, time, delta)| {
387                logic(data)
388                    .into_iter()
389                    .map(move |x| (x, time.clone(), delta.clone()))
390            })
391            .as_collection()
392    }
393    /// Creates a new collection containing those input records satisfying the supplied predicate.
394    ///
395    /// # Examples
396    ///
397    /// ```
398    /// use palimpsest_dataflow::input::Input;
399    ///
400    /// ::timely::example(|scope| {
401    ///     scope.new_collection_from(1 .. 10).1
402    ///          .map(|x| x * 2)
403    ///          .filter(|x| x % 2 == 1)
404    ///          .assert_empty();
405    /// });
406    /// ```
407    pub fn filter<L>(&self, mut logic: L) -> VecCollection<G, D, R>
408    where
409        L: FnMut(&D) -> bool + 'static,
410    {
411        self.inner
412            .filter(move |(data, _, _)| logic(data))
413            .as_collection()
414    }
415    /// Replaces each record with another, with a new difference type.
416    ///
417    /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
418    /// and move the data into the difference component. This will allow differential dataflow to update in-place.
419    ///
420    /// # Examples
421    ///
422    /// ```
423    /// use palimpsest_dataflow::input::Input;
424    ///
425    /// ::timely::example(|scope| {
426    ///
427    ///     let nums = scope.new_collection_from(0 .. 10).1;
428    ///     let x1 = nums.flat_map(|x| 0 .. x);
429    ///     let x2 = nums.map(|x| (x, 9 - x))
430    ///                  .explode(|(x,y)| Some((x,y)));
431    ///
432    ///     x1.assert_eq(&x2);
433    /// });
434    /// ```
435    pub fn explode<D2, R2, I, L>(
436        &self,
437        mut logic: L,
438    ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
439    where
440        D2: Data,
441        R2: Semigroup + Multiply<R, Output: Semigroup + 'static>,
442        I: IntoIterator<Item = (D2, R2)>,
443        L: FnMut(D) -> I + 'static,
444    {
445        self.inner
446            .flat_map(move |(x, t, d)| {
447                logic(x)
448                    .into_iter()
449                    .map(move |(x, d2)| (x, t.clone(), d2.multiply(&d)))
450            })
451            .as_collection()
452    }
453
454    /// Joins each record against a collection defined by the function `logic`.
455    ///
456    /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
457    /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
458    /// modifications made to the results, namely joining timestamps and multiplying differences.
459    ///
460    /// #Examples
461    ///
462    /// ```
463    /// use palimpsest_dataflow::input::Input;
464    ///
465    /// ::timely::example(|scope| {
466    ///     // creates `x` copies of `2*x` from time `3*x` until `4*x`,
467    ///     // for x from 0 through 9.
468    ///     scope.new_collection_from(0 .. 10isize).1
469    ///          .join_function(|x|
470    ///              //   data      time      diff
471    ///              vec![(2*x, (3*x) as u64,  x),
472    ///                   (2*x, (4*x) as u64, -x)]
473    ///           );
474    /// });
475    /// ```
476    pub fn join_function<D2, R2, I, L>(
477        &self,
478        mut logic: L,
479    ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
480    where
481        G::Timestamp: Lattice,
482        D2: Data,
483        R2: Semigroup + Multiply<R, Output: Semigroup + 'static>,
484        I: IntoIterator<Item = (D2, G::Timestamp, R2)>,
485        L: FnMut(D) -> I + 'static,
486    {
487        self.inner
488            .flat_map(move |(x, t, d)| {
489                logic(x)
490                    .into_iter()
491                    .map(move |(x, t2, d2)| (x, t.join(&t2), d2.multiply(&d)))
492            })
493            .as_collection()
494    }
495
496    /// Brings a Collection into a nested scope, at varying times.
497    ///
498    /// The `initial` function indicates the time at which each element of the Collection should appear.
499    ///
500    /// # Examples
501    ///
502    /// ```
503    /// use timely::dataflow::Scope;
504    /// use palimpsest_dataflow::input::Input;
505    ///
506    /// ::timely::example(|scope| {
507    ///
508    ///     let data = scope.new_collection_from(1 .. 10).1;
509    ///
510    ///     let result = scope.iterative::<u64,_,_>(|child| {
511    ///         data.enter_at(child, |x| *x)
512    ///             .leave()
513    ///     });
514    ///
515    ///     data.assert_eq(&result);
516    /// });
517    /// ```
518    pub fn enter_at<'a, T, F>(
519        &self,
520        child: &Iterative<'a, G, T>,
521        mut initial: F,
522    ) -> VecCollection<Iterative<'a, G, T>, D, R>
523    where
524        T: Timestamp + Hash,
525        F: FnMut(&D) -> T + Clone + 'static,
526        G::Timestamp: Hash,
527    {
528        self.inner
529            .enter(child)
530            .map(move |(data, time, diff)| {
531                let new_time = Product::new(time, initial(&data));
532                (data, new_time, diff)
533            })
534            .as_collection()
535    }
536
537    /// Delays each difference by a supplied function.
538    ///
539    /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
540    /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
541    /// ordered, they should have the same order or compare equal once `func` is applied to them (this
542    /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
543    /// to all of the data timestamps).
544    pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
545    where
546        G::Timestamp: Hash,
547        F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
548    {
549        let mut func1 = func.clone();
550        let mut func2 = func.clone();
551
552        self.inner
553            .delay_batch(move |x| func1(x))
554            .map_in_place(move |x| x.1 = func2(&x.1))
555            .as_collection()
556    }
557
558    /// Applies a supplied function to each update.
559    ///
560    /// This method is most commonly used to report information back to the user, often for debugging purposes.
561    /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
562    /// not guarantee that it will be called as many times as you might expect.
563    ///
564    /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
565    /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
566    /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
567    /// interesting and less intuitive, unfortunately.
568    ///
569    /// # Examples
570    ///
571    /// ```
572    /// use palimpsest_dataflow::input::Input;
573    ///
574    /// ::timely::example(|scope| {
575    ///     scope.new_collection_from(1 .. 10).1
576    ///          .map_in_place(|x| *x *= 2)
577    ///          .filter(|x| x % 2 == 1)
578    ///          .inspect(|x| println!("error: {:?}", x));
579    /// });
580    /// ```
581    pub fn inspect<F>(&self, func: F) -> VecCollection<G, D, R>
582    where
583        F: FnMut(&(D, G::Timestamp, R)) + 'static,
584    {
585        self.inner.inspect(func).as_collection()
586    }
587    /// Applies a supplied function to each batch of updates.
588    ///
589    /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
590    /// timely dataflow capability associated with the batch of updates. The observed batching depends
591    /// on how the system executes, and may vary run to run.
592    ///
593    /// # Examples
594    ///
595    /// ```
596    /// use palimpsest_dataflow::input::Input;
597    ///
598    /// ::timely::example(|scope| {
599    ///     scope.new_collection_from(1 .. 10).1
600    ///          .map_in_place(|x| *x *= 2)
601    ///          .filter(|x| x % 2 == 1)
602    ///          .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
603    /// });
604    /// ```
605    pub fn inspect_batch<F>(&self, mut func: F) -> VecCollection<G, D, R>
606    where
607        F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)]) + 'static,
608    {
609        self.inner
610            .inspect_batch(move |time, data| func(time, data))
611            .as_collection()
612    }
613
614    /// Assert if the collection is ever non-empty.
615    ///
616    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
617    /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
618    /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
619    /// program should indicate that this assertion never found cause to complain.
620    ///
621    /// # Examples
622    ///
623    /// ```
624    /// use palimpsest_dataflow::input::Input;
625    ///
626    /// ::timely::example(|scope| {
627    ///     scope.new_collection_from(1 .. 10).1
628    ///          .map(|x| x * 2)
629    ///          .filter(|x| x % 2 == 1)
630    ///          .assert_empty();
631    /// });
632    /// ```
633    pub fn assert_empty(&self)
634    where
635        D: crate::ExchangeData + Hashable,
636        R: crate::ExchangeData + Hashable + Semigroup,
637        G::Timestamp: Lattice + Ord,
638    {
639        self.consolidate()
640            .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
641    }
642}
643
644use timely::dataflow::scopes::ScopeParent;
645use timely::progress::timestamp::Refines;
646
647/// Methods requiring a nested scope.
648impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
649where
650    C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
651    T: Refines<<G as ScopeParent>::Timestamp>,
652{
653    /// Returns the final value of a Collection from a nested scope to its containing scope.
654    ///
655    /// # Examples
656    ///
657    /// ```
658    /// use timely::dataflow::Scope;
659    /// use palimpsest_dataflow::input::Input;
660    ///
661    /// ::timely::example(|scope| {
662    ///
663    ///    let data = scope.new_collection_from(1 .. 10).1;
664    ///
665    ///    let result = scope.region(|child| {
666    ///         data.enter(child)
667    ///             .leave()
668    ///     });
669    ///
670    ///     data.assert_eq(&result);
671    /// });
672    /// ```
673    pub fn leave(
674        &self,
675    ) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
676        use timely::dataflow::channels::pact::Pipeline;
677        self.inner
678            .leave()
679            .unary(Pipeline, "Leave", move |_, _| {
680                move |input, output| {
681                    input.for_each(|time, data| {
682                        output
683                            .session(&time)
684                            .give_container(&mut std::mem::take(data).leave())
685                    });
686                }
687            })
688            .as_collection()
689    }
690}
691
692/// Methods requiring a region as the scope.
693impl<G: Scope, C: Container + Data> Collection<Child<'_, G, G::Timestamp>, C> {
694    /// Returns the value of a Collection from a nested region to its containing scope.
695    ///
696    /// This method is a specialization of `leave` to the case that of a nested region.
697    /// It removes the need for an operator that adjusts the timestamp.
698    pub fn leave_region(&self) -> Collection<G, C> {
699        self.inner.leave().as_collection()
700    }
701}
702
703/// Methods requiring an Abelian difference, to support negation.
704impl<G: Scope<Timestamp: Data>, D: Clone + 'static, R: Abelian + 'static> VecCollection<G, D, R> {
705    /// Assert if the collections are ever different.
706    ///
707    /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
708    /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
709    /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
710    /// indicate that this assertion never found cause to complain.
711    ///
712    /// # Examples
713    ///
714    /// ```
715    /// use palimpsest_dataflow::input::Input;
716    ///
717    /// ::timely::example(|scope| {
718    ///
719    ///     let data = scope.new_collection_from(1 .. 10).1;
720    ///
721    ///     let odds = data.filter(|x| x % 2 == 1);
722    ///     let evens = data.filter(|x| x % 2 == 0);
723    ///
724    ///     odds.concat(&evens)
725    ///         .assert_eq(&data);
726    /// });
727    /// ```
728    pub fn assert_eq(&self, other: &Self)
729    where
730        D: crate::ExchangeData + Hashable,
731        R: crate::ExchangeData + Hashable,
732        G::Timestamp: Lattice + Ord,
733    {
734        self.negate().concat(other).assert_empty();
735    }
736}
737
738/// Conversion to a differential dataflow Collection.
739pub trait AsCollection<G: Scope, C> {
740    /// Converts the type to a differential dataflow collection.
741    fn as_collection(&self) -> Collection<G, C>;
742}
743
744impl<G: Scope, C: Clone> AsCollection<G, C> for StreamCore<G, C> {
745    /// Converts the type to a differential dataflow collection.
746    ///
747    /// By calling this method, you guarantee that the timestamp invariant (as documented on
748    /// [Collection]) is upheld. This method will not check it.
749    fn as_collection(&self) -> Collection<G, C> {
750        Collection::<G, C>::new(self.clone())
751    }
752}
753
754/// Concatenates multiple collections.
755///
756/// This method has the effect of a sequence of calls to `concat`, but it does
757/// so in one operator rather than a chain of many operators.
758///
759/// # Examples
760///
761/// ```
762/// use palimpsest_dataflow::input::Input;
763///
764/// ::timely::example(|scope| {
765///
766///     let data = scope.new_collection_from(1 .. 10).1;
767///
768///     let odds = data.filter(|x| x % 2 == 1);
769///     let evens = data.filter(|x| x % 2 == 0);
770///
771///     palimpsest_dataflow::collection::concatenate(scope, vec![odds, evens])
772///         .assert_eq(&data);
773/// });
774/// ```
775pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
776where
777    G: Scope,
778    C: Container,
779    I: IntoIterator<Item = Collection<G, C>>,
780{
781    scope
782        .concatenate(iterator.into_iter().map(|x| x.inner))
783        .as_collection()
784}
785
786/// Traits that can be implemented by containers to provide functionality to collections based on them.
787pub mod containers {
788
789    use crate::collection::Abelian;
790    use timely::progress::{timestamp::Refines, Timestamp};
791
792    /// A container that can negate its updates.
793    pub trait Negate {
794        /// Negates Abelian differences of each update.
795        fn negate(self) -> Self;
796    }
797    impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
798        fn negate(mut self) -> Self {
799            for (_data, _time, diff) in self.iter_mut() {
800                diff.negate();
801            }
802            self
803        }
804    }
805
806    /// A container that can enter from timestamp `T1` into timestamp `T2`.
807    pub trait Enter<T1, T2> {
808        /// The resulting container type.
809        type InnerContainer;
810        /// Update timestamps from `T1` to `T2`.
811        fn enter(self) -> Self::InnerContainer;
812    }
813    impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
814        type InnerContainer = Vec<(D, T2, R)>;
815        fn enter(self) -> Self::InnerContainer {
816            self.into_iter()
817                .map(|(d, t1, r)| (d, T2::to_inner(t1), r))
818                .collect()
819        }
820    }
821
822    /// A container that can leave from timestamp `T1` into timestamp `T2`.
823    pub trait Leave<T1, T2> {
824        /// The resulting container type.
825        type OuterContainer;
826        /// Update timestamps from `T1` to `T2`.
827        fn leave(self) -> Self::OuterContainer;
828    }
829    impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
830        type OuterContainer = Vec<(D, T2, R)>;
831        fn leave(self) -> Self::OuterContainer {
832            self.into_iter()
833                .map(|(d, t1, r)| (d, t1.to_outer(), r))
834                .collect()
835        }
836    }
837
838    /// A container that can advance timestamps by a summary `TS`.
839    pub trait ResultsIn<TS> {
840        /// Advance times in the container by `step`.
841        fn results_in(self, step: &TS) -> Self;
842    }
843    impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
844        fn results_in(self, step: &T::Summary) -> Self {
845            use timely::progress::PathSummary;
846            self.into_iter()
847                .filter_map(move |(d, t, r)| step.results_in(&t).map(|t| (d, t, r)))
848                .collect()
849        }
850    }
851}