palimpsest_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use std::hash::Hash;
12
13use timely::dataflow::operators::*;
14use timely::dataflow::scopes::{child::Iterative, Child};
15use timely::dataflow::Scope;
16use timely::dataflow::StreamCore;
17use timely::order::Product;
18use timely::progress::Timestamp;
19use timely::{Container, Data};
20
21use crate::difference::{Abelian, Multiply, Semigroup};
22use crate::hashable::Hashable;
23use crate::lattice::Lattice;
24
25/// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
26///
27/// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
28/// differential dataflow computation, you write as if the collection is a static dataset to which you
29/// apply functional transformations, creating new collections. Once your computation is written, you
30/// are able to mutate the collection (by inserting and removing elements); differential dataflow will
31/// propagate changes through your functional computation and report the corresponding changes to the
32/// output collections.
33///
34/// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the
35/// collection exists; as you write more complicated programs you may wish to introduce nested scopes
36/// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
37/// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
38/// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
39/// defaults to) `isize`, representing changes to the occurrence count of each record.
40///
41/// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`.
42pub type VecCollection<G, D, R = isize> = Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
43
44/// An evolving collection represented by a stream of abstract containers.
45///
46/// The containers purport to reperesent changes to a collection, and they must implement various traits
47/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
48/// on the containers, and streams of containers, are left to the container implementor to describe.
49#[derive(Clone)]
50pub struct Collection<G: Scope, C> {
51 /// The underlying timely dataflow stream.
52 ///
53 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
54 /// not intended to be the idiomatic way to work with the collection.
55 ///
56 /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
57 /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
58 /// unexpectedly.
59 pub inner: timely::dataflow::StreamCore<G, C>,
60}
61
62impl<G: Scope, C> Collection<G, C> {
63 /// Creates a new Collection from a timely dataflow stream.
64 ///
65 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
66 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
67 /// provides a `new_collection` method which will create a new collection for you without exposing
68 /// the underlying timely stream at all.
69 ///
70 /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
71 /// method does not check it.
72 pub fn new(stream: StreamCore<G, C>) -> Self {
73 Self { inner: stream }
74 }
75}
76impl<G: Scope, C: Container> Collection<G, C> {
77 /// Creates a new collection accumulating the contents of the two collections.
78 ///
79 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
80 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
81 /// two collections.
82 ///
83 /// # Examples
84 ///
85 /// ```
86 /// use palimpsest_dataflow::input::Input;
87 ///
88 /// ::timely::example(|scope| {
89 ///
90 /// let data = scope.new_collection_from(1 .. 10).1;
91 ///
92 /// let odds = data.filter(|x| x % 2 == 1);
93 /// let evens = data.filter(|x| x % 2 == 0);
94 ///
95 /// odds.concat(&evens)
96 /// .assert_eq(&data);
97 /// });
98 /// ```
99 pub fn concat(&self, other: &Self) -> Self {
100 self.inner.concat(&other.inner).as_collection()
101 }
102 /// Creates a new collection accumulating the contents of the two collections.
103 ///
104 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
105 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
106 /// two collections.
107 ///
108 /// # Examples
109 ///
110 /// ```
111 /// use palimpsest_dataflow::input::Input;
112 ///
113 /// ::timely::example(|scope| {
114 ///
115 /// let data = scope.new_collection_from(1 .. 10).1;
116 ///
117 /// let odds = data.filter(|x| x % 2 == 1);
118 /// let evens = data.filter(|x| x % 2 == 0);
119 ///
120 /// odds.concatenate(Some(evens))
121 /// .assert_eq(&data);
122 /// });
123 /// ```
124 pub fn concatenate<I>(&self, sources: I) -> Self
125 where
126 I: IntoIterator<Item = Self>,
127 {
128 self.inner
129 .concatenate(sources.into_iter().map(|x| x.inner))
130 .as_collection()
131 }
132 // Brings a Collection into a nested region.
133 ///
134 /// This method is a specialization of `enter` to the case where the nested scope is a region.
135 /// It removes the need for an operator that adjusts the timestamp.
136 pub fn enter_region<'a>(
137 &self,
138 child: &Child<'a, G, <G as ScopeParent>::Timestamp>,
139 ) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
140 self.inner.enter(child).as_collection()
141 }
142 /// Applies a supplied function to each batch of updates.
143 ///
144 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
145 /// timely dataflow capability associated with the batch of updates. The observed batching depends
146 /// on how the system executes, and may vary run to run.
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// use palimpsest_dataflow::input::Input;
152 ///
153 /// ::timely::example(|scope| {
154 /// scope.new_collection_from(1 .. 10).1
155 /// .map_in_place(|x| *x *= 2)
156 /// .filter(|x| x % 2 == 1)
157 /// .inspect_container(|event| println!("event: {:?}", event));
158 /// });
159 /// ```
160 pub fn inspect_container<F>(&self, func: F) -> Self
161 where
162 F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>) + 'static,
163 {
164 self.inner.inspect_container(func).as_collection()
165 }
166 /// Attaches a timely dataflow probe to the output of a Collection.
167 ///
168 /// This probe is used to determine when the state of the Collection has stabilized and can
169 /// be read out.
170 pub fn probe(&self) -> probe::Handle<G::Timestamp> {
171 self.inner.probe()
172 }
173 /// Attaches a timely dataflow probe to the output of a Collection.
174 ///
175 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
176 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
177 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
178 /// avoid swamping the system.
179 pub fn probe_with(&self, handle: &probe::Handle<G::Timestamp>) -> Self {
180 Self::new(self.inner.probe_with(handle))
181 }
182 /// The scope containing the underlying timely dataflow stream.
183 pub fn scope(&self) -> G {
184 self.inner.scope()
185 }
186
187 /// Creates a new collection whose counts are the negation of those in the input.
188 ///
189 /// This method is most commonly used with `concat` to get those element in one collection but not another.
190 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
191 /// including negative counts.
192 ///
193 /// # Examples
194 ///
195 /// ```
196 /// use palimpsest_dataflow::input::Input;
197 ///
198 /// ::timely::example(|scope| {
199 ///
200 /// let data = scope.new_collection_from(1 .. 10).1;
201 ///
202 /// let odds = data.filter(|x| x % 2 == 1);
203 /// let evens = data.filter(|x| x % 2 == 0);
204 ///
205 /// odds.negate()
206 /// .concat(&data)
207 /// .assert_eq(&evens);
208 /// });
209 /// ```
210 pub fn negate(&self) -> Self
211 where
212 C: containers::Negate,
213 {
214 use timely::dataflow::channels::pact::Pipeline;
215 self.inner
216 .unary(Pipeline, "Negate", move |_, _| {
217 move |input, output| {
218 input.for_each(|time, data| {
219 output
220 .session(&time)
221 .give_container(&mut std::mem::take(data).negate())
222 });
223 }
224 })
225 .as_collection()
226 }
227
228 /// Brings a Collection into a nested scope.
229 ///
230 /// # Examples
231 ///
232 /// ```
233 /// use timely::dataflow::Scope;
234 /// use palimpsest_dataflow::input::Input;
235 ///
236 /// ::timely::example(|scope| {
237 ///
238 /// let data = scope.new_collection_from(1 .. 10).1;
239 ///
240 /// let result = scope.region(|child| {
241 /// data.enter(child)
242 /// .leave()
243 /// });
244 ///
245 /// data.assert_eq(&result);
246 /// });
247 /// ```
248 pub fn enter<'a, T>(
249 &self,
250 child: &Child<'a, G, T>,
251 ) -> Collection<
252 Child<'a, G, T>,
253 <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer,
254 >
255 where
256 C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
257 T: Refines<<G as ScopeParent>::Timestamp>,
258 {
259 use timely::dataflow::channels::pact::Pipeline;
260 self.inner
261 .enter(child)
262 .unary(Pipeline, "Enter", move |_, _| {
263 move |input, output| {
264 input.for_each(|time, data| {
265 output
266 .session(&time)
267 .give_container(&mut std::mem::take(data).enter())
268 });
269 }
270 })
271 .as_collection()
272 }
273
274 /// Advances a timestamp in the stream according to the timestamp actions on the path.
275 ///
276 /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
277 /// incrementing fields would result in integer overflow. In this case, the record is dropped.
278 ///
279 /// # Examples
280 /// ```
281 /// use timely::dataflow::Scope;
282 /// use timely::dataflow::operators::{ToStream, Concat, Inspect, BranchWhen};
283 ///
284 /// use palimpsest_dataflow::input::Input;
285 ///
286 /// timely::example(|scope| {
287 /// let summary1 = 5;
288 ///
289 /// let data = scope.new_collection_from(1 .. 10).1;
290 /// /// Applies `results_in` on every timestamp in the collection.
291 /// data.results_in(summary1);
292 /// });
293 /// ```
294 pub fn results_in(&self, step: <G::Timestamp as Timestamp>::Summary) -> Self
295 where
296 C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
297 {
298 use timely::dataflow::channels::pact::Pipeline;
299 self.inner
300 .unary(Pipeline, "ResultsIn", move |_, _| {
301 move |input, output| {
302 input.for_each(|time, data| {
303 output
304 .session(&time)
305 .give_container(&mut std::mem::take(data).results_in(&step))
306 });
307 }
308 })
309 .as_collection()
310 }
311}
312
313impl<G: Scope, D: Clone + 'static, R: Clone + 'static> VecCollection<G, D, R> {
314 /// Creates a new collection by applying the supplied function to each input element.
315 ///
316 /// # Examples
317 ///
318 /// ```
319 /// use palimpsest_dataflow::input::Input;
320 ///
321 /// ::timely::example(|scope| {
322 /// scope.new_collection_from(1 .. 10).1
323 /// .map(|x| x * 2)
324 /// .filter(|x| x % 2 == 1)
325 /// .assert_empty();
326 /// });
327 /// ```
328 pub fn map<D2, L>(&self, mut logic: L) -> VecCollection<G, D2, R>
329 where
330 D2: Data,
331 L: FnMut(D) -> D2 + 'static,
332 {
333 self.inner
334 .map(move |(data, time, delta)| (logic(data), time, delta))
335 .as_collection()
336 }
337 /// Creates a new collection by applying the supplied function to each input element.
338 ///
339 /// Although the name suggests in-place mutation, this function does not change the source collection,
340 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
341 /// equivalent to `map`, but can be more efficient.
342 ///
343 /// # Examples
344 ///
345 /// ```
346 /// use palimpsest_dataflow::input::Input;
347 ///
348 /// ::timely::example(|scope| {
349 /// scope.new_collection_from(1 .. 10).1
350 /// .map_in_place(|x| *x *= 2)
351 /// .filter(|x| x % 2 == 1)
352 /// .assert_empty();
353 /// });
354 /// ```
355 pub fn map_in_place<L>(&self, mut logic: L) -> VecCollection<G, D, R>
356 where
357 L: FnMut(&mut D) + 'static,
358 {
359 self.inner
360 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
361 .as_collection()
362 }
363 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
364 ///
365 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
366 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
367 /// attempting to consolidate the results.
368 ///
369 /// # Examples
370 ///
371 /// ```
372 /// use palimpsest_dataflow::input::Input;
373 ///
374 /// ::timely::example(|scope| {
375 /// scope.new_collection_from(1 .. 10).1
376 /// .flat_map(|x| 0 .. x);
377 /// });
378 /// ```
379 pub fn flat_map<I, L>(&self, mut logic: L) -> VecCollection<G, I::Item, R>
380 where
381 G::Timestamp: Clone,
382 I: IntoIterator<Item: Data>,
383 L: FnMut(D) -> I + 'static,
384 {
385 self.inner
386 .flat_map(move |(data, time, delta)| {
387 logic(data)
388 .into_iter()
389 .map(move |x| (x, time.clone(), delta.clone()))
390 })
391 .as_collection()
392 }
393 /// Creates a new collection containing those input records satisfying the supplied predicate.
394 ///
395 /// # Examples
396 ///
397 /// ```
398 /// use palimpsest_dataflow::input::Input;
399 ///
400 /// ::timely::example(|scope| {
401 /// scope.new_collection_from(1 .. 10).1
402 /// .map(|x| x * 2)
403 /// .filter(|x| x % 2 == 1)
404 /// .assert_empty();
405 /// });
406 /// ```
407 pub fn filter<L>(&self, mut logic: L) -> VecCollection<G, D, R>
408 where
409 L: FnMut(&D) -> bool + 'static,
410 {
411 self.inner
412 .filter(move |(data, _, _)| logic(data))
413 .as_collection()
414 }
415 /// Replaces each record with another, with a new difference type.
416 ///
417 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
418 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
419 ///
420 /// # Examples
421 ///
422 /// ```
423 /// use palimpsest_dataflow::input::Input;
424 ///
425 /// ::timely::example(|scope| {
426 ///
427 /// let nums = scope.new_collection_from(0 .. 10).1;
428 /// let x1 = nums.flat_map(|x| 0 .. x);
429 /// let x2 = nums.map(|x| (x, 9 - x))
430 /// .explode(|(x,y)| Some((x,y)));
431 ///
432 /// x1.assert_eq(&x2);
433 /// });
434 /// ```
435 pub fn explode<D2, R2, I, L>(
436 &self,
437 mut logic: L,
438 ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
439 where
440 D2: Data,
441 R2: Semigroup + Multiply<R, Output: Semigroup + 'static>,
442 I: IntoIterator<Item = (D2, R2)>,
443 L: FnMut(D) -> I + 'static,
444 {
445 self.inner
446 .flat_map(move |(x, t, d)| {
447 logic(x)
448 .into_iter()
449 .map(move |(x, d2)| (x, t.clone(), d2.multiply(&d)))
450 })
451 .as_collection()
452 }
453
454 /// Joins each record against a collection defined by the function `logic`.
455 ///
456 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
457 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
458 /// modifications made to the results, namely joining timestamps and multiplying differences.
459 ///
460 /// #Examples
461 ///
462 /// ```
463 /// use palimpsest_dataflow::input::Input;
464 ///
465 /// ::timely::example(|scope| {
466 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
467 /// // for x from 0 through 9.
468 /// scope.new_collection_from(0 .. 10isize).1
469 /// .join_function(|x|
470 /// // data time diff
471 /// vec![(2*x, (3*x) as u64, x),
472 /// (2*x, (4*x) as u64, -x)]
473 /// );
474 /// });
475 /// ```
476 pub fn join_function<D2, R2, I, L>(
477 &self,
478 mut logic: L,
479 ) -> VecCollection<G, D2, <R2 as Multiply<R>>::Output>
480 where
481 G::Timestamp: Lattice,
482 D2: Data,
483 R2: Semigroup + Multiply<R, Output: Semigroup + 'static>,
484 I: IntoIterator<Item = (D2, G::Timestamp, R2)>,
485 L: FnMut(D) -> I + 'static,
486 {
487 self.inner
488 .flat_map(move |(x, t, d)| {
489 logic(x)
490 .into_iter()
491 .map(move |(x, t2, d2)| (x, t.join(&t2), d2.multiply(&d)))
492 })
493 .as_collection()
494 }
495
496 /// Brings a Collection into a nested scope, at varying times.
497 ///
498 /// The `initial` function indicates the time at which each element of the Collection should appear.
499 ///
500 /// # Examples
501 ///
502 /// ```
503 /// use timely::dataflow::Scope;
504 /// use palimpsest_dataflow::input::Input;
505 ///
506 /// ::timely::example(|scope| {
507 ///
508 /// let data = scope.new_collection_from(1 .. 10).1;
509 ///
510 /// let result = scope.iterative::<u64,_,_>(|child| {
511 /// data.enter_at(child, |x| *x)
512 /// .leave()
513 /// });
514 ///
515 /// data.assert_eq(&result);
516 /// });
517 /// ```
518 pub fn enter_at<'a, T, F>(
519 &self,
520 child: &Iterative<'a, G, T>,
521 mut initial: F,
522 ) -> VecCollection<Iterative<'a, G, T>, D, R>
523 where
524 T: Timestamp + Hash,
525 F: FnMut(&D) -> T + Clone + 'static,
526 G::Timestamp: Hash,
527 {
528 self.inner
529 .enter(child)
530 .map(move |(data, time, diff)| {
531 let new_time = Product::new(time, initial(&data));
532 (data, new_time, diff)
533 })
534 .as_collection()
535 }
536
537 /// Delays each difference by a supplied function.
538 ///
539 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
540 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
541 /// ordered, they should have the same order or compare equal once `func` is applied to them (this
542 /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
543 /// to all of the data timestamps).
544 pub fn delay<F>(&self, func: F) -> VecCollection<G, D, R>
545 where
546 G::Timestamp: Hash,
547 F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
548 {
549 let mut func1 = func.clone();
550 let mut func2 = func.clone();
551
552 self.inner
553 .delay_batch(move |x| func1(x))
554 .map_in_place(move |x| x.1 = func2(&x.1))
555 .as_collection()
556 }
557
558 /// Applies a supplied function to each update.
559 ///
560 /// This method is most commonly used to report information back to the user, often for debugging purposes.
561 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
562 /// not guarantee that it will be called as many times as you might expect.
563 ///
564 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
565 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
566 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
567 /// interesting and less intuitive, unfortunately.
568 ///
569 /// # Examples
570 ///
571 /// ```
572 /// use palimpsest_dataflow::input::Input;
573 ///
574 /// ::timely::example(|scope| {
575 /// scope.new_collection_from(1 .. 10).1
576 /// .map_in_place(|x| *x *= 2)
577 /// .filter(|x| x % 2 == 1)
578 /// .inspect(|x| println!("error: {:?}", x));
579 /// });
580 /// ```
581 pub fn inspect<F>(&self, func: F) -> VecCollection<G, D, R>
582 where
583 F: FnMut(&(D, G::Timestamp, R)) + 'static,
584 {
585 self.inner.inspect(func).as_collection()
586 }
587 /// Applies a supplied function to each batch of updates.
588 ///
589 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
590 /// timely dataflow capability associated with the batch of updates. The observed batching depends
591 /// on how the system executes, and may vary run to run.
592 ///
593 /// # Examples
594 ///
595 /// ```
596 /// use palimpsest_dataflow::input::Input;
597 ///
598 /// ::timely::example(|scope| {
599 /// scope.new_collection_from(1 .. 10).1
600 /// .map_in_place(|x| *x *= 2)
601 /// .filter(|x| x % 2 == 1)
602 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
603 /// });
604 /// ```
605 pub fn inspect_batch<F>(&self, mut func: F) -> VecCollection<G, D, R>
606 where
607 F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)]) + 'static,
608 {
609 self.inner
610 .inspect_batch(move |time, data| func(time, data))
611 .as_collection()
612 }
613
614 /// Assert if the collection is ever non-empty.
615 ///
616 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
617 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
618 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
619 /// program should indicate that this assertion never found cause to complain.
620 ///
621 /// # Examples
622 ///
623 /// ```
624 /// use palimpsest_dataflow::input::Input;
625 ///
626 /// ::timely::example(|scope| {
627 /// scope.new_collection_from(1 .. 10).1
628 /// .map(|x| x * 2)
629 /// .filter(|x| x % 2 == 1)
630 /// .assert_empty();
631 /// });
632 /// ```
633 pub fn assert_empty(&self)
634 where
635 D: crate::ExchangeData + Hashable,
636 R: crate::ExchangeData + Hashable + Semigroup,
637 G::Timestamp: Lattice + Ord,
638 {
639 self.consolidate()
640 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
641 }
642}
643
644use timely::dataflow::scopes::ScopeParent;
645use timely::progress::timestamp::Refines;
646
647/// Methods requiring a nested scope.
648impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
649where
650 C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
651 T: Refines<<G as ScopeParent>::Timestamp>,
652{
653 /// Returns the final value of a Collection from a nested scope to its containing scope.
654 ///
655 /// # Examples
656 ///
657 /// ```
658 /// use timely::dataflow::Scope;
659 /// use palimpsest_dataflow::input::Input;
660 ///
661 /// ::timely::example(|scope| {
662 ///
663 /// let data = scope.new_collection_from(1 .. 10).1;
664 ///
665 /// let result = scope.region(|child| {
666 /// data.enter(child)
667 /// .leave()
668 /// });
669 ///
670 /// data.assert_eq(&result);
671 /// });
672 /// ```
673 pub fn leave(
674 &self,
675 ) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
676 use timely::dataflow::channels::pact::Pipeline;
677 self.inner
678 .leave()
679 .unary(Pipeline, "Leave", move |_, _| {
680 move |input, output| {
681 input.for_each(|time, data| {
682 output
683 .session(&time)
684 .give_container(&mut std::mem::take(data).leave())
685 });
686 }
687 })
688 .as_collection()
689 }
690}
691
692/// Methods requiring a region as the scope.
693impl<G: Scope, C: Container + Data> Collection<Child<'_, G, G::Timestamp>, C> {
694 /// Returns the value of a Collection from a nested region to its containing scope.
695 ///
696 /// This method is a specialization of `leave` to the case that of a nested region.
697 /// It removes the need for an operator that adjusts the timestamp.
698 pub fn leave_region(&self) -> Collection<G, C> {
699 self.inner.leave().as_collection()
700 }
701}
702
703/// Methods requiring an Abelian difference, to support negation.
704impl<G: Scope<Timestamp: Data>, D: Clone + 'static, R: Abelian + 'static> VecCollection<G, D, R> {
705 /// Assert if the collections are ever different.
706 ///
707 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
708 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
709 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
710 /// indicate that this assertion never found cause to complain.
711 ///
712 /// # Examples
713 ///
714 /// ```
715 /// use palimpsest_dataflow::input::Input;
716 ///
717 /// ::timely::example(|scope| {
718 ///
719 /// let data = scope.new_collection_from(1 .. 10).1;
720 ///
721 /// let odds = data.filter(|x| x % 2 == 1);
722 /// let evens = data.filter(|x| x % 2 == 0);
723 ///
724 /// odds.concat(&evens)
725 /// .assert_eq(&data);
726 /// });
727 /// ```
728 pub fn assert_eq(&self, other: &Self)
729 where
730 D: crate::ExchangeData + Hashable,
731 R: crate::ExchangeData + Hashable,
732 G::Timestamp: Lattice + Ord,
733 {
734 self.negate().concat(other).assert_empty();
735 }
736}
737
738/// Conversion to a differential dataflow Collection.
739pub trait AsCollection<G: Scope, C> {
740 /// Converts the type to a differential dataflow collection.
741 fn as_collection(&self) -> Collection<G, C>;
742}
743
744impl<G: Scope, C: Clone> AsCollection<G, C> for StreamCore<G, C> {
745 /// Converts the type to a differential dataflow collection.
746 ///
747 /// By calling this method, you guarantee that the timestamp invariant (as documented on
748 /// [Collection]) is upheld. This method will not check it.
749 fn as_collection(&self) -> Collection<G, C> {
750 Collection::<G, C>::new(self.clone())
751 }
752}
753
754/// Concatenates multiple collections.
755///
756/// This method has the effect of a sequence of calls to `concat`, but it does
757/// so in one operator rather than a chain of many operators.
758///
759/// # Examples
760///
761/// ```
762/// use palimpsest_dataflow::input::Input;
763///
764/// ::timely::example(|scope| {
765///
766/// let data = scope.new_collection_from(1 .. 10).1;
767///
768/// let odds = data.filter(|x| x % 2 == 1);
769/// let evens = data.filter(|x| x % 2 == 0);
770///
771/// palimpsest_dataflow::collection::concatenate(scope, vec![odds, evens])
772/// .assert_eq(&data);
773/// });
774/// ```
775pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
776where
777 G: Scope,
778 C: Container,
779 I: IntoIterator<Item = Collection<G, C>>,
780{
781 scope
782 .concatenate(iterator.into_iter().map(|x| x.inner))
783 .as_collection()
784}
785
786/// Traits that can be implemented by containers to provide functionality to collections based on them.
787pub mod containers {
788
789 use crate::collection::Abelian;
790 use timely::progress::{timestamp::Refines, Timestamp};
791
792 /// A container that can negate its updates.
793 pub trait Negate {
794 /// Negates Abelian differences of each update.
795 fn negate(self) -> Self;
796 }
797 impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
798 fn negate(mut self) -> Self {
799 for (_data, _time, diff) in self.iter_mut() {
800 diff.negate();
801 }
802 self
803 }
804 }
805
806 /// A container that can enter from timestamp `T1` into timestamp `T2`.
807 pub trait Enter<T1, T2> {
808 /// The resulting container type.
809 type InnerContainer;
810 /// Update timestamps from `T1` to `T2`.
811 fn enter(self) -> Self::InnerContainer;
812 }
813 impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
814 type InnerContainer = Vec<(D, T2, R)>;
815 fn enter(self) -> Self::InnerContainer {
816 self.into_iter()
817 .map(|(d, t1, r)| (d, T2::to_inner(t1), r))
818 .collect()
819 }
820 }
821
822 /// A container that can leave from timestamp `T1` into timestamp `T2`.
823 pub trait Leave<T1, T2> {
824 /// The resulting container type.
825 type OuterContainer;
826 /// Update timestamps from `T1` to `T2`.
827 fn leave(self) -> Self::OuterContainer;
828 }
829 impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
830 type OuterContainer = Vec<(D, T2, R)>;
831 fn leave(self) -> Self::OuterContainer {
832 self.into_iter()
833 .map(|(d, t1, r)| (d, t1.to_outer(), r))
834 .collect()
835 }
836 }
837
838 /// A container that can advance timestamps by a summary `TS`.
839 pub trait ResultsIn<TS> {
840 /// Advance times in the container by `step`.
841 fn results_in(self, step: &TS) -> Self;
842 }
843 impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
844 fn results_in(self, step: &T::Summary) -> Self {
845 use timely::progress::PathSummary;
846 self.into_iter()
847 .filter_map(move |(d, t, r)| step.results_in(&t).map(|t| (d, t, r)))
848 .collect()
849 }
850 }
851}