differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use timely::Container;
12use timely::progress::Timestamp;
13use timely::dataflow::scopes::Child;
14use timely::dataflow::{Scope, Stream};
15use timely::dataflow::operators::*;
16
17use crate::difference::Abelian;
18
19/// An evolving collection represented by a stream of abstract containers.
20///
21/// The containers purport to reperesent changes to a collection, and they must implement various traits
22/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
23/// on the containers, and streams of containers, are left to the container implementor to describe.
24#[derive(Clone)]
25pub struct Collection<G: Scope, C: 'static> {
26 /// The underlying timely dataflow stream.
27 ///
28 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
29 /// not intended to be the idiomatic way to work with the collection.
30 ///
31 /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
32 /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
33 /// unexpectedly.
34 pub inner: Stream<G, C>,
35}
36
37impl<G: Scope, C> Collection<G, C> {
38 /// Creates a new Collection from a timely dataflow stream.
39 ///
40 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
41 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
42 /// provides a `new_collection` method which will create a new collection for you without exposing
43 /// the underlying timely stream at all.
44 ///
45 /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
46 /// method does not check it.
47 pub fn new(stream: Stream<G, C>) -> Self { Self { inner: stream } }
48}
49impl<G: Scope, C: Container> Collection<G, C> {
50 /// Creates a new collection accumulating the contents of the two collections.
51 ///
52 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
53 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
54 /// two collections.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// use differential_dataflow::input::Input;
60 ///
61 /// ::timely::example(|scope| {
62 ///
63 /// let data = scope.new_collection_from(1 .. 10).1;
64 ///
65 /// let odds = data.clone().filter(|x| x % 2 == 1);
66 /// let evens = data.clone().filter(|x| x % 2 == 0);
67 ///
68 /// odds.concat(evens)
69 /// .assert_eq(data);
70 /// });
71 /// ```
72 pub fn concat(self, other: Self) -> Self {
73 self.inner
74 .concat(other.inner)
75 .as_collection()
76 }
77 /// Creates a new collection accumulating the contents of the two collections.
78 ///
79 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
80 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
81 /// two collections.
82 ///
83 /// # Examples
84 ///
85 /// ```
86 /// use differential_dataflow::input::Input;
87 ///
88 /// ::timely::example(|scope| {
89 ///
90 /// let data = scope.new_collection_from(1 .. 10).1;
91 ///
92 /// let odds = data.clone().filter(|x| x % 2 == 1);
93 /// let evens = data.clone().filter(|x| x % 2 == 0);
94 ///
95 /// odds.concatenate(Some(evens))
96 /// .assert_eq(data);
97 /// });
98 /// ```
99 pub fn concatenate<I>(self, sources: I) -> Self
100 where
101 I: IntoIterator<Item=Self>
102 {
103 self.inner
104 .scope()
105 .concatenate(sources.into_iter().map(|x| x.inner).chain([self.inner]))
106 .as_collection()
107 }
108 // Brings a Collection into a nested region.
109 ///
110 /// This method is a specialization of `enter` to the case where the nested scope is a region.
111 /// It removes the need for an operator that adjusts the timestamp.
112 pub fn enter_region<'a>(self, child: &Child<'a, G, <G as ScopeParent>::Timestamp>) -> Collection<Child<'a, G, <G as ScopeParent>::Timestamp>, C> {
113 self.inner
114 .enter(child)
115 .as_collection()
116 }
117 /// Applies a supplied function to each batch of updates.
118 ///
119 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
120 /// timely dataflow capability associated with the batch of updates. The observed batching depends
121 /// on how the system executes, and may vary run to run.
122 ///
123 /// # Examples
124 ///
125 /// ```
126 /// use differential_dataflow::input::Input;
127 ///
128 /// ::timely::example(|scope| {
129 /// scope.new_collection_from(1 .. 10).1
130 /// .map_in_place(|x| *x *= 2)
131 /// .filter(|x| x % 2 == 1)
132 /// .inspect_container(|event| println!("event: {:?}", event));
133 /// });
134 /// ```
135 pub fn inspect_container<F>(self, func: F) -> Self
136 where
137 F: FnMut(Result<(&G::Timestamp, &C), &[G::Timestamp]>)+'static,
138 {
139 self.inner
140 .inspect_container(func)
141 .as_collection()
142 }
143 /// Attaches a timely dataflow probe to the output of a Collection.
144 ///
145 /// This probe is used to determine when the state of the Collection has stabilized and can
146 /// be read out.
147 pub fn probe(self) -> (probe::Handle<G::Timestamp>, Self) {
148 let (handle, stream) = self.inner.probe();
149 (handle, stream.as_collection())
150 }
151 /// Attaches a timely dataflow probe to the output of a Collection.
152 ///
153 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
154 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
155 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
156 /// avoid swamping the system.
157 pub fn probe_with(self, handle: &probe::Handle<G::Timestamp>) -> Self {
158 Self::new(self.inner.probe_with(handle))
159 }
160 /// The scope containing the underlying timely dataflow stream.
161 pub fn scope(&self) -> G {
162 self.inner.scope()
163 }
164
165 /// Creates a new collection whose counts are the negation of those in the input.
166 ///
167 /// This method is most commonly used with `concat` to get those element in one collection but not another.
168 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
169 /// including negative counts.
170 ///
171 /// # Examples
172 ///
173 /// ```
174 /// use differential_dataflow::input::Input;
175 ///
176 /// ::timely::example(|scope| {
177 ///
178 /// let data = scope.new_collection_from(1 .. 10).1;
179 ///
180 /// let odds = data.clone().filter(|x| x % 2 == 1);
181 /// let evens = data.clone().filter(|x| x % 2 == 0);
182 ///
183 /// odds.negate()
184 /// .concat(data)
185 /// .assert_eq(evens);
186 /// });
187 /// ```
188 pub fn negate(self) -> Self where C: containers::Negate {
189 use timely::dataflow::channels::pact::Pipeline;
190 self.inner
191 .unary(Pipeline, "Negate", move |_,_| move |input, output| {
192 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
193 })
194 .as_collection()
195 }
196
197 /// Brings a Collection into a nested scope.
198 ///
199 /// # Examples
200 ///
201 /// ```
202 /// use timely::dataflow::Scope;
203 /// use differential_dataflow::input::Input;
204 ///
205 /// ::timely::example(|scope| {
206 ///
207 /// let data = scope.new_collection_from(1 .. 10).1;
208 ///
209 /// let result = scope.region(|child| {
210 /// data.clone()
211 /// .enter(child)
212 /// .leave()
213 /// });
214 ///
215 /// data.assert_eq(result);
216 /// });
217 /// ```
218 pub fn enter<'a, T>(self, child: &Child<'a, G, T>) -> Collection<Child<'a, G, T>, <C as containers::Enter<<G as ScopeParent>::Timestamp, T>>::InnerContainer>
219 where
220 C: containers::Enter<<G as ScopeParent>::Timestamp, T, InnerContainer: Container>,
221 T: Refines<<G as ScopeParent>::Timestamp>,
222 {
223 use timely::dataflow::channels::pact::Pipeline;
224 self.inner
225 .enter(child)
226 .unary(Pipeline, "Enter", move |_,_| move |input, output| {
227 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
228 })
229 .as_collection()
230 }
231
232 /// Advances a timestamp in the stream according to the timestamp actions on the path.
233 ///
234 /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
235 /// incrementing fields would result in integer overflow. In this case, the record is dropped.
236 ///
237 /// # Examples
238 /// ```
239 /// use timely::dataflow::Scope;
240 /// use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::BranchWhen};
241 ///
242 /// use differential_dataflow::input::Input;
243 ///
244 /// timely::example(|scope| {
245 /// let summary1 = 5;
246 ///
247 /// let data = scope.new_collection_from(1 .. 10).1;
248 /// /// Applies `results_in` on every timestamp in the collection.
249 /// data.results_in(summary1);
250 /// });
251 /// ```
252 pub fn results_in(self, step: <G::Timestamp as Timestamp>::Summary) -> Self
253 where
254 C: containers::ResultsIn<<G::Timestamp as Timestamp>::Summary>,
255 {
256 use timely::dataflow::channels::pact::Pipeline;
257 self.inner
258 .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
259 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
260 })
261 .as_collection()
262 }
263}
264
265use timely::dataflow::scopes::ScopeParent;
266use timely::progress::timestamp::Refines;
267
268/// Methods requiring a nested scope.
269impl<'a, G: Scope, T: Timestamp, C: Container> Collection<Child<'a, G, T>, C>
270where
271 C: containers::Leave<T, G::Timestamp, OuterContainer: Container>,
272 T: Refines<<G as ScopeParent>::Timestamp>,
273{
274 /// Returns the final value of a Collection from a nested scope to its containing scope.
275 ///
276 /// # Examples
277 ///
278 /// ```
279 /// use timely::dataflow::Scope;
280 /// use differential_dataflow::input::Input;
281 ///
282 /// ::timely::example(|scope| {
283 ///
284 /// let data = scope.new_collection_from(1 .. 10).1;
285 ///
286 /// let result = scope.region(|child| {
287 /// data.clone()
288 /// .enter(child)
289 /// .leave()
290 /// });
291 ///
292 /// data.assert_eq(result);
293 /// });
294 /// ```
295 pub fn leave(self) -> Collection<G, <C as containers::Leave<T, G::Timestamp>>::OuterContainer> {
296 use timely::dataflow::channels::pact::Pipeline;
297 self.inner
298 .leave()
299 .unary(Pipeline, "Leave", move |_,_| move |input, output| {
300 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
301 })
302 .as_collection()
303 }
304}
305
306/// Methods requiring a region as the scope.
307impl<G: Scope, C: Container+Clone+'static> Collection<Child<'_, G, G::Timestamp>, C>
308{
309 /// Returns the value of a Collection from a nested region to its containing scope.
310 ///
311 /// This method is a specialization of `leave` to the case that of a nested region.
312 /// It removes the need for an operator that adjusts the timestamp.
313 pub fn leave_region(self) -> Collection<G, C> {
314 self.inner
315 .leave()
316 .as_collection()
317 }
318}
319
320pub use vec::Collection as VecCollection;
321/// Specializations of `Collection` that use `Vec` as the container.
322pub mod vec {
323
324 use std::hash::Hash;
325
326 use timely::progress::Timestamp;
327 use timely::order::Product;
328 use timely::dataflow::scopes::child::Iterative;
329 use timely::dataflow::{Scope, ScopeParent};
330 use timely::dataflow::operators::*;
331 use timely::dataflow::operators::vec::*;
332
333 use crate::collection::AsCollection;
334 use crate::difference::{Semigroup, Abelian, Multiply};
335 use crate::lattice::Lattice;
336 use crate::hashable::Hashable;
337
338 /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
339 ///
340 /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
341 /// differential dataflow computation, you write as if the collection is a static dataset to which you
342 /// apply functional transformations, creating new collections. Once your computation is written, you
343 /// are able to mutate the collection (by inserting and removing elements); differential dataflow will
344 /// propagate changes through your functional computation and report the corresponding changes to the
345 /// output collections.
346 ///
347 /// Each vec collection has three generic parameters. The parameter `G` is for the scope in which the
348 /// collection exists; as you write more complicated programs you may wish to introduce nested scopes
349 /// (e.g. for iteration) and this parameter tracks the scope (for timely dataflow's benefit). The `D`
350 /// parameter is the type of data in your collection, for example `String`, or `(u32, Vec<Option<()>>)`.
351 /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
352 /// defaults to) `isize`, representing changes to the occurrence count of each record.
353 ///
354 /// This type definition instantiates the [`Collection`] type with a `Vec<(D, G::Timestamp, R)>`.
355 pub type Collection<G, D, R = isize> = super::Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;
356
357
358 impl<G: Scope, D: Clone+'static, R: Clone+'static> Collection<G, D, R> {
359 /// Creates a new collection by applying the supplied function to each input element.
360 ///
361 /// # Examples
362 ///
363 /// ```
364 /// use differential_dataflow::input::Input;
365 ///
366 /// ::timely::example(|scope| {
367 /// scope.new_collection_from(1 .. 10).1
368 /// .map(|x| x * 2)
369 /// .filter(|x| x % 2 == 1)
370 /// .assert_empty();
371 /// });
372 /// ```
373 pub fn map<D2, L>(self, mut logic: L) -> Collection<G, D2, R>
374 where
375 D2: Clone+'static,
376 L: FnMut(D) -> D2 + 'static,
377 {
378 self.inner
379 .map(move |(data, time, delta)| (logic(data), time, delta))
380 .as_collection()
381 }
382 /// Creates a new collection by applying the supplied function to each input element.
383 ///
384 /// Although the name suggests in-place mutation, this function does not change the source collection,
385 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
386 /// equivalent to `map`, but can be more efficient.
387 ///
388 /// # Examples
389 ///
390 /// ```
391 /// use differential_dataflow::input::Input;
392 ///
393 /// ::timely::example(|scope| {
394 /// scope.new_collection_from(1 .. 10).1
395 /// .map_in_place(|x| *x *= 2)
396 /// .filter(|x| x % 2 == 1)
397 /// .assert_empty();
398 /// });
399 /// ```
400 pub fn map_in_place<L>(self, mut logic: L) -> Collection<G, D, R>
401 where
402 L: FnMut(&mut D) + 'static,
403 {
404 self.inner
405 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
406 .as_collection()
407 }
408 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
409 ///
410 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
411 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
412 /// attempting to consolidate the results.
413 ///
414 /// # Examples
415 ///
416 /// ```
417 /// use differential_dataflow::input::Input;
418 ///
419 /// ::timely::example(|scope| {
420 /// scope.new_collection_from(1 .. 10).1
421 /// .flat_map(|x| 0 .. x);
422 /// });
423 /// ```
424 pub fn flat_map<I, L>(self, mut logic: L) -> Collection<G, I::Item, R>
425 where
426 G::Timestamp: Clone,
427 I: IntoIterator<Item: Clone+'static>,
428 L: FnMut(D) -> I + 'static,
429 {
430 self.inner
431 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
432 .as_collection()
433 }
434 /// Creates a new collection containing those input records satisfying the supplied predicate.
435 ///
436 /// # Examples
437 ///
438 /// ```
439 /// use differential_dataflow::input::Input;
440 ///
441 /// ::timely::example(|scope| {
442 /// scope.new_collection_from(1 .. 10).1
443 /// .map(|x| x * 2)
444 /// .filter(|x| x % 2 == 1)
445 /// .assert_empty();
446 /// });
447 /// ```
448 pub fn filter<L>(self, mut logic: L) -> Collection<G, D, R>
449 where
450 L: FnMut(&D) -> bool + 'static,
451 {
452 self.inner
453 .filter(move |(data, _, _)| logic(data))
454 .as_collection()
455 }
456 /// Replaces each record with another, with a new difference type.
457 ///
458 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
459 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
460 ///
461 /// # Examples
462 ///
463 /// ```
464 /// use differential_dataflow::input::Input;
465 ///
466 /// ::timely::example(|scope| {
467 ///
468 /// let nums = scope.new_collection_from(0 .. 10).1;
469 /// let x1 = nums.clone().flat_map(|x| 0 .. x);
470 /// let x2 = nums.map(|x| (x, 9 - x))
471 /// .explode(|(x,y)| Some((x,y)));
472 ///
473 /// x1.assert_eq(x2);
474 /// });
475 /// ```
476 pub fn explode<D2, R2, I, L>(self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
477 where
478 D2: Clone+'static,
479 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
480 I: IntoIterator<Item=(D2,R2)>,
481 L: FnMut(D)->I+'static,
482 {
483 self.inner
484 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
485 .as_collection()
486 }
487
488 /// Joins each record against a collection defined by the function `logic`.
489 ///
490 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
491 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
492 /// modifications made to the results, namely joining timestamps and multiplying differences.
493 ///
494 /// #Examples
495 ///
496 /// ```
497 /// use differential_dataflow::input::Input;
498 ///
499 /// ::timely::example(|scope| {
500 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
501 /// // for x from 0 through 9.
502 /// scope.new_collection_from(0 .. 10isize).1
503 /// .join_function(|x|
504 /// // data time diff
505 /// vec![(2*x, (3*x) as u64, x),
506 /// (2*x, (4*x) as u64, -x)]
507 /// );
508 /// });
509 /// ```
510 pub fn join_function<D2, R2, I, L>(self, mut logic: L) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
511 where
512 G::Timestamp: Lattice,
513 D2: Clone+'static,
514 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
515 I: IntoIterator<Item=(D2,G::Timestamp,R2)>,
516 L: FnMut(D)->I+'static,
517 {
518 self.inner
519 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
520 .as_collection()
521 }
522
523 /// Brings a Collection into a nested scope, at varying times.
524 ///
525 /// The `initial` function indicates the time at which each element of the Collection should appear.
526 ///
527 /// # Examples
528 ///
529 /// ```
530 /// use timely::dataflow::Scope;
531 /// use differential_dataflow::input::Input;
532 ///
533 /// ::timely::example(|scope| {
534 ///
535 /// let data = scope.new_collection_from(1 .. 10).1;
536 ///
537 /// let result = scope.iterative::<u64,_,_>(|child| {
538 /// data.clone()
539 /// .enter_at(child, |x| *x)
540 /// .leave()
541 /// });
542 ///
543 /// data.assert_eq(result);
544 /// });
545 /// ```
546 pub fn enter_at<'a, T, F>(self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
547 where
548 T: Timestamp+Hash,
549 F: FnMut(&D) -> T + Clone + 'static,
550 G::Timestamp: Hash,
551 {
552 self.inner
553 .enter(child)
554 .map(move |(data, time, diff)| {
555 let new_time = Product::new(time, initial(&data));
556 (data, new_time, diff)
557 })
558 .as_collection()
559 }
560
561 /// Delays each difference by a supplied function.
562 ///
563 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
564 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
565 /// ordered, they should have the same order or compare equal once `func` is applied to them (this
566 /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
567 /// to all of the data timestamps).
568 pub fn delay<F>(self, func: F) -> Collection<G, D, R>
569 where
570 G::Timestamp: Hash,
571 F: FnMut(&G::Timestamp) -> G::Timestamp + Clone + 'static,
572 {
573 let mut func1 = func.clone();
574 let mut func2 = func.clone();
575
576 self.inner
577 .delay_batch(move |x| func1(x))
578 .map_in_place(move |x| x.1 = func2(&x.1))
579 .as_collection()
580 }
581
582 /// Applies a supplied function to each update.
583 ///
584 /// This method is most commonly used to report information back to the user, often for debugging purposes.
585 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
586 /// not guarantee that it will be called as many times as you might expect.
587 ///
588 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
589 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
590 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
591 /// interesting and less intuitive, unfortunately.
592 ///
593 /// # Examples
594 ///
595 /// ```
596 /// use differential_dataflow::input::Input;
597 ///
598 /// ::timely::example(|scope| {
599 /// scope.new_collection_from(1 .. 10).1
600 /// .map_in_place(|x| *x *= 2)
601 /// .filter(|x| x % 2 == 1)
602 /// .inspect(|x| println!("error: {:?}", x));
603 /// });
604 /// ```
605 pub fn inspect<F>(self, func: F) -> Collection<G, D, R>
606 where
607 F: FnMut(&(D, G::Timestamp, R))+'static,
608 {
609 self.inner
610 .inspect(func)
611 .as_collection()
612 }
613 /// Applies a supplied function to each batch of updates.
614 ///
615 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
616 /// timely dataflow capability associated with the batch of updates. The observed batching depends
617 /// on how the system executes, and may vary run to run.
618 ///
619 /// # Examples
620 ///
621 /// ```
622 /// use differential_dataflow::input::Input;
623 ///
624 /// ::timely::example(|scope| {
625 /// scope.new_collection_from(1 .. 10).1
626 /// .map_in_place(|x| *x *= 2)
627 /// .filter(|x| x % 2 == 1)
628 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
629 /// });
630 /// ```
631 pub fn inspect_batch<F>(self, mut func: F) -> Collection<G, D, R>
632 where
633 F: FnMut(&G::Timestamp, &[(D, G::Timestamp, R)])+'static,
634 {
635 self.inner
636 .inspect_batch(move |time, data| func(time, data))
637 .as_collection()
638 }
639
640 /// Assert if the collection is ever non-empty.
641 ///
642 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
643 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
644 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
645 /// program should indicate that this assertion never found cause to complain.
646 ///
647 /// # Examples
648 ///
649 /// ```
650 /// use differential_dataflow::input::Input;
651 ///
652 /// ::timely::example(|scope| {
653 /// scope.new_collection_from(1 .. 10).1
654 /// .map(|x| x * 2)
655 /// .filter(|x| x % 2 == 1)
656 /// .assert_empty();
657 /// });
658 /// ```
659 pub fn assert_empty(self)
660 where
661 D: crate::ExchangeData+Hashable,
662 R: crate::ExchangeData+Hashable + Semigroup,
663 G::Timestamp: Lattice+Ord,
664 {
665 self.consolidate()
666 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
667 }
668 }
669
670 /// Methods requiring an Abelian difference, to support negation.
671 impl<G: Scope<Timestamp: Clone+'static>, D: Clone+'static, R: Abelian+'static> Collection<G, D, R> {
672 /// Assert if the collections are ever different.
673 ///
674 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
675 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
676 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
677 /// indicate that this assertion never found cause to complain.
678 ///
679 /// # Examples
680 ///
681 /// ```
682 /// use differential_dataflow::input::Input;
683 ///
684 /// ::timely::example(|scope| {
685 ///
686 /// let data = scope.new_collection_from(1 .. 10).1;
687 ///
688 /// let odds = data.clone().filter(|x| x % 2 == 1);
689 /// let evens = data.clone().filter(|x| x % 2 == 0);
690 ///
691 /// odds.concat(evens)
692 /// .assert_eq(data);
693 /// });
694 /// ```
695 pub fn assert_eq(self, other: Self)
696 where
697 D: crate::ExchangeData+Hashable,
698 R: crate::ExchangeData+Hashable,
699 G::Timestamp: Lattice+Ord,
700 {
701 self.negate()
702 .concat(other)
703 .assert_empty();
704 }
705 }
706
707 use crate::trace::{Trace, Builder};
708 use crate::operators::arrange::{Arranged, TraceAgent};
709
710 impl <G, K, V, R> Collection<G, (K, V), R>
711 where
712 G: Scope<Timestamp: Lattice+Ord>,
713 K: crate::ExchangeData+Hashable,
714 V: crate::ExchangeData,
715 R: crate::ExchangeData+Semigroup,
716 {
717 /// Applies a reduction function on records grouped by key.
718 ///
719 /// Input data must be structured as `(key, val)` pairs.
720 /// The user-supplied reduction function takes as arguments
721 ///
722 /// 1. a reference to the key,
723 /// 2. a reference to the slice of values and their accumulated updates,
724 /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
725 ///
726 /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
727 /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
728 /// `Ord` implementations.
729 ///
730 /// # Examples
731 ///
732 /// ```
733 /// use differential_dataflow::input::Input;
734 ///
735 /// ::timely::example(|scope| {
736 /// // report the smallest value for each group
737 /// scope.new_collection_from(1 .. 10).1
738 /// .map(|x| (x / 3, x))
739 /// .reduce(|_key, input, output| {
740 /// output.push((*input[0].0, 1))
741 /// });
742 /// });
743 /// ```
744 pub fn reduce<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, logic: L) -> Collection<G, (K, V2), R2>
745 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
746 self.reduce_named("Reduce", logic)
747 }
748
749 /// As `reduce` with the ability to name the operator.
750 pub fn reduce_named<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
751 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
752 use crate::trace::implementations::{ValBuilder, ValSpine};
753
754 self.arrange_by_key_named(&format!("Arrange: {}", name))
755 .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>>(name, logic)
756 .as_collection(|k,v| (k.clone(), v.clone()))
757 }
758
759 /// Applies `reduce` to arranged data, and returns an arrangement of output data.
760 ///
761 /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
762 /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
763 ///
764 /// # Examples
765 ///
766 /// ```
767 /// use differential_dataflow::input::Input;
768 /// use differential_dataflow::trace::Trace;
769 /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
770 ///
771 /// ::timely::example(|scope| {
772 ///
773 /// let trace =
774 /// scope.new_collection_from(1 .. 10u32).1
775 /// .map(|x| (x, x))
776 /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
777 /// "Example",
778 /// move |_key, src, dst| dst.push((*src[0].0, 1))
779 /// )
780 /// .trace;
781 /// });
782 /// ```
783 pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
784 where
785 T2: for<'a> Trace<Key<'a>= &'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp, Diff: Abelian>+'static,
786 Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
787 L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
788 {
789 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
790 if !input.is_empty() { logic(key, input, change); }
791 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
792 crate::consolidation::consolidate(change);
793 })
794 }
795
796 /// Solves for output updates when presented with inputs and would-be outputs.
797 ///
798 /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
799 /// and it may not be safe to index into the first element.
800 /// At least one of the two collections will be non-empty.
801 pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
802 where
803 V: Clone+'static,
804 T2: for<'a> Trace<Key<'a>=&'a K, KeyOwn = K, ValOwn = V, Time=G::Timestamp>+'static,
805 Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
806 L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
807 {
808 self.arrange_by_key_named(&format!("Arrange: {}", name))
809 .reduce_core::<_,Bu,_>(name, logic)
810 }
811 }
812
813 impl<G, K, R1> Collection<G, K, R1>
814 where
815 G: Scope<Timestamp: Lattice+Ord>,
816 K: crate::ExchangeData+Hashable,
817 R1: crate::ExchangeData+Semigroup
818 {
819
820 /// Reduces the collection to one occurrence of each distinct element.
821 ///
822 /// # Examples
823 ///
824 /// ```
825 /// use differential_dataflow::input::Input;
826 ///
827 /// ::timely::example(|scope| {
828 /// // report at most one of each key.
829 /// scope.new_collection_from(1 .. 10).1
830 /// .map(|x| x / 3)
831 /// .distinct();
832 /// });
833 /// ```
834 pub fn distinct(self) -> Collection<G, K, isize> {
835 self.distinct_core()
836 }
837
838 /// Distinct for general integer differences.
839 ///
840 /// This method allows `distinct` to produce collections whose difference
841 /// type is something other than an `isize` integer, for example perhaps an
842 /// `i32`.
843 pub fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(self) -> Collection<G, K, R2> {
844 self.threshold_named("Distinct", |_,_| R2::from(1i8))
845 }
846
847 /// Transforms the multiplicity of records.
848 ///
849 /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
850 /// least the computation may behave as if it does. Otherwise, the transformation
851 /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
852 ///
853 /// # Examples
854 ///
855 /// ```
856 /// use differential_dataflow::input::Input;
857 ///
858 /// ::timely::example(|scope| {
859 /// // report at most one of each key.
860 /// scope.new_collection_from(1 .. 10).1
861 /// .map(|x| x / 3)
862 /// .threshold(|_,c| c % 2);
863 /// });
864 /// ```
865 pub fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(self, thresh: F) -> Collection<G, K, R2> {
866 self.threshold_named("Threshold", thresh)
867 }
868
869 /// A `threshold` with the ability to name the operator.
870 pub fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
871 use crate::trace::implementations::{KeyBuilder, KeySpine};
872
873 self.arrange_by_self_named(&format!("Arrange: {}", name))
874 .reduce_abelian::<_,KeyBuilder<K,G::Timestamp,R2>,KeySpine<K,G::Timestamp,R2>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
875 .as_collection(|k,_| k.clone())
876 }
877
878 }
879
880 impl<G, K, R> Collection<G, K, R>
881 where
882 G: Scope<Timestamp: Lattice+Ord>,
883 K: crate::ExchangeData+Hashable,
884 R: crate::ExchangeData+Semigroup
885 {
886
887 /// Counts the number of occurrences of each element.
888 ///
889 /// # Examples
890 ///
891 /// ```
892 /// use differential_dataflow::input::Input;
893 ///
894 /// ::timely::example(|scope| {
895 /// // report the number of occurrences of each key
896 /// scope.new_collection_from(1 .. 10).1
897 /// .map(|x| x / 3)
898 /// .count();
899 /// });
900 /// ```
901 pub fn count(self) -> Collection<G, (K, R), isize> { self.count_core() }
902
903 /// Count for general integer differences.
904 ///
905 /// This method allows `count` to produce collections whose difference
906 /// type is something other than an `isize` integer, for example perhaps an
907 /// `i32`.
908 pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<G, (K, R), R2> {
909 use crate::trace::implementations::{ValBuilder, ValSpine};
910 self.arrange_by_self_named("Arrange: Count")
911 .reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
912 .as_collection(|k,c| (k.clone(), c.clone()))
913 }
914 }
915
916 /// Methods which require data be arrangeable.
917 impl<G, D, R> Collection<G, D, R>
918 where
919 G: Scope<Timestamp: Clone+'static+Lattice>,
920 D: crate::ExchangeData+Hashable,
921 R: crate::ExchangeData+Semigroup,
922 {
923 /// Aggregates the weights of equal records into at most one record.
924 ///
925 /// This method uses the type `D`'s `hashed()` method to partition the data. The data are
926 /// accumulated in place, each held back until their timestamp has completed.
927 ///
928 /// # Examples
929 ///
930 /// ```
931 /// use differential_dataflow::input::Input;
932 ///
933 /// ::timely::example(|scope| {
934 ///
935 /// let x = scope.new_collection_from(1 .. 10u32).1;
936 ///
937 /// x.clone()
938 /// .negate()
939 /// .concat(x)
940 /// .consolidate() // <-- ensures cancellation occurs
941 /// .assert_empty();
942 /// });
943 /// ```
944 pub fn consolidate(self) -> Self {
945 use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
946 self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
947 }
948
949 /// As `consolidate` but with the ability to name the operator, specify the trace type,
950 /// and provide the function `reify` to produce owned keys and values..
951 pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
952 where
953 Ba: crate::trace::Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
954 Tr: for<'a> crate::trace::Trace<Time=G::Timestamp,Diff=R>+'static,
955 Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
956 F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
957 {
958 use crate::operators::arrange::arrangement::Arrange;
959 self.map(|k| (k, ()))
960 .arrange_named::<Ba, Bu, Tr>(name)
961 .as_collection(reify)
962 }
963
964 /// Aggregates the weights of equal records.
965 ///
966 /// Unlike `consolidate`, this method does not exchange data and does not
967 /// ensure that at most one copy of each `(data, time)` pair exists in the
968 /// results. Instead, it acts on each batch of data and collapses equivalent
969 /// `(data, time)` pairs found therein, suppressing any that accumulate to
970 /// zero.
971 ///
972 /// # Examples
973 ///
974 /// ```
975 /// use differential_dataflow::input::Input;
976 ///
977 /// ::timely::example(|scope| {
978 ///
979 /// let x = scope.new_collection_from(1 .. 10u32).1;
980 ///
981 /// // nothing to assert, as no particular guarantees.
982 /// x.clone()
983 /// .negate()
984 /// .concat(x)
985 /// .consolidate_stream();
986 /// });
987 /// ```
988 pub fn consolidate_stream(self) -> Self {
989
990 use timely::dataflow::channels::pact::Pipeline;
991 use timely::dataflow::operators::Operator;
992 use crate::collection::AsCollection;
993 use crate::consolidation::ConsolidatingContainerBuilder;
994
995 self.inner
996 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {
997
998 move |input, output| {
999 input.for_each(|time, data| {
1000 output.session_with_builder(&time).give_iterator(data.drain(..));
1001 })
1002 }
1003 })
1004 .as_collection()
1005 }
1006 }
1007
1008 use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
1009 use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
1010 use crate::operators::arrange::Arrange;
1011
1012 impl<G, K, V, R> Arrange<G, Vec<((K, V), G::Timestamp, R)>> for Collection<G, (K, V), R>
1013 where
1014 G: Scope<Timestamp: Lattice>,
1015 K: crate::ExchangeData + Hashable,
1016 V: crate::ExchangeData,
1017 R: crate::ExchangeData + Semigroup,
1018 {
1019 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
1020 where
1021 Ba: crate::trace::Batcher<Input=Vec<((K, V), G::Timestamp, R)>, Time=G::Timestamp> + 'static,
1022 Bu: crate::trace::Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
1023 Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
1024 {
1025 let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into());
1026 crate::operators::arrange::arrangement::arrange_core::<_, _, Ba, Bu, _>(self.inner, exchange, name)
1027 }
1028 }
1029
1030 impl<G, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<G, Vec<((K, ()), G::Timestamp, R)>> for Collection<G, K, R>
1031 where
1032 G: Scope<Timestamp: Lattice+Ord>,
1033 {
1034 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
1035 where
1036 Ba: crate::trace::Batcher<Input=Vec<((K,()),G::Timestamp,R)>, Time=G::Timestamp> + 'static,
1037 Bu: crate::trace::Builder<Time=G::Timestamp, Input=Ba::Output, Output = Tr::Batch>,
1038 Tr: crate::trace::Trace<Time=G::Timestamp> + 'static,
1039 {
1040 let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into());
1041 crate::operators::arrange::arrangement::arrange_core::<_,_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
1042 }
1043 }
1044
1045
1046 impl<G, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<G, (K,V), R>
1047 where
1048 G: Scope<Timestamp: Lattice+Ord>,
1049 {
1050 /// Arranges a collection of `(Key, Val)` records by `Key`.
1051 ///
1052 /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
1053 /// This trace is current for all times completed by the output stream, which can be used to
1054 /// safely identify the stable times and values in the trace.
1055 pub fn arrange_by_key(self) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
1056 self.arrange_by_key_named("ArrangeByKey")
1057 }
1058
1059 /// As `arrange_by_key` but with the ability to name the arrangement.
1060 pub fn arrange_by_key_named(self, name: &str) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>> {
1061 self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
1062 }
1063 }
1064
1065 impl<G, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<G, K, R>
1066 where
1067 G: Scope<Timestamp: Lattice+Ord>,
1068 {
1069 /// Arranges a collection of `Key` records by `Key`.
1070 ///
1071 /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
1072 /// This trace is current for all times complete in the output stream, which can be used to safely
1073 /// identify the stable times and values in the trace.
1074 pub fn arrange_by_self(self) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
1075 self.arrange_by_self_named("ArrangeBySelf")
1076 }
1077
1078 /// As `arrange_by_self` but with the ability to name the arrangement.
1079 pub fn arrange_by_self_named(self, name: &str) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>> {
1080 self.map(|k| (k, ()))
1081 .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
1082 }
1083 }
1084
1085 impl<G, K, V, R> Collection<G, (K, V), R>
1086 where
1087 G: Scope<Timestamp: Lattice+Ord>,
1088 K: crate::ExchangeData+Hashable,
1089 V: crate::ExchangeData,
1090 R: crate::ExchangeData+Semigroup,
1091 {
1092 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
1093 ///
1094 /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
1095 ///
1096 /// # Examples
1097 ///
1098 /// ```
1099 /// use differential_dataflow::input::Input;
1100 ///
1101 /// ::timely::example(|scope| {
1102 ///
1103 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1104 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1105 /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
1106 ///
1107 /// x.join(y)
1108 /// .assert_eq(z);
1109 /// });
1110 /// ```
1111 pub fn join<V2, R2>(self, other: Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
1112 where
1113 K: crate::ExchangeData,
1114 V2: crate::ExchangeData,
1115 R2: crate::ExchangeData+Semigroup,
1116 R: Multiply<R2, Output: Semigroup+'static>,
1117 {
1118 self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
1119 }
1120
1121 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
1122 ///
1123 /// # Examples
1124 ///
1125 /// ```
1126 /// use differential_dataflow::input::Input;
1127 ///
1128 /// ::timely::example(|scope| {
1129 ///
1130 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1131 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1132 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1133 ///
1134 /// x.join_map(y, |_key, &a, &b| (a,b))
1135 /// .assert_eq(z);
1136 /// });
1137 /// ```
1138 pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(self, other: Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
1139 where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
1140 let arranged1 = self.arrange_by_key();
1141 let arranged2 = other.arrange_by_key();
1142 arranged1.join_core(arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
1143 }
1144
1145 /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
1146 ///
1147 /// When the second collection contains frequencies that are either zero or one this is the more traditional
1148 /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
1149 /// the counts of the records in the first input.
1150 ///
1151 /// # Examples
1152 ///
1153 /// ```
1154 /// use differential_dataflow::input::Input;
1155 ///
1156 /// ::timely::example(|scope| {
1157 ///
1158 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1159 /// let y = scope.new_collection_from(vec![0, 2]).1;
1160 /// let z = scope.new_collection_from(vec![(0, 1)]).1;
1161 ///
1162 /// x.semijoin(y)
1163 /// .assert_eq(z);
1164 /// });
1165 /// ```
1166 pub fn semijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
1167 where R: Multiply<R2, Output: Semigroup+'static> {
1168 let arranged1 = self.arrange_by_key();
1169 let arranged2 = other.arrange_by_self();
1170 arranged1.join_core(arranged2, |k,v,_| Some((k.clone(), v.clone())))
1171 }
1172
1173 /// Subtracts the semijoin with `other` from `self`.
1174 ///
1175 /// In the case that `other` has multiplicities zero or one this results
1176 /// in a relational antijoin, in which we discard input records whose key
1177 /// is present in `other`. If the multiplicities could be other than zero
1178 /// or one, the semantic interpretation of this operator is less clear.
1179 ///
1180 /// In almost all cases, you should ensure that `other` has multiplicities
1181 /// that are zero or one, perhaps by using the `distinct` operator.
1182 ///
1183 /// # Examples
1184 ///
1185 /// ```
1186 /// use differential_dataflow::input::Input;
1187 ///
1188 /// ::timely::example(|scope| {
1189 ///
1190 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1191 /// let y = scope.new_collection_from(vec![0, 2]).1;
1192 /// let z = scope.new_collection_from(vec![(1, 3)]).1;
1193 ///
1194 /// x.antijoin(y)
1195 /// .assert_eq(z);
1196 /// });
1197 /// ```
1198 pub fn antijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<G, K, R2>) -> Collection<G, (K, V), R>
1199 where R: Multiply<R2, Output=R>, R: Abelian+'static {
1200 self.clone().concat(self.semijoin(other).negate())
1201 }
1202
1203 /// Joins two arranged collections with the same key type.
1204 ///
1205 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
1206 /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
1207 /// every value returned by the iterator.
1208 ///
1209 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
1210 /// contains the implementations for collections.
1211 ///
1212 /// # Examples
1213 ///
1214 /// ```
1215 /// use differential_dataflow::input::Input;
1216 /// use differential_dataflow::trace::Trace;
1217 ///
1218 /// ::timely::example(|scope| {
1219 ///
1220 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
1221 /// .arrange_by_key();
1222 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
1223 /// .arrange_by_key();
1224 ///
1225 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1226 ///
1227 /// x.join_core(y, |_key, &a, &b| Some((a, b)))
1228 /// .assert_eq(z);
1229 /// });
1230 /// ```
1231 pub fn join_core<Tr2,I,L> (self, stream2: Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
1232 where
1233 Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
1234 R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
1235 I: IntoIterator<Item: crate::Data>,
1236 L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
1237 {
1238 self.arrange_by_key()
1239 .join_core(stream2, result)
1240 }
1241 }
1242}
1243
1244/// Conversion to a differential dataflow Collection.
1245pub trait AsCollection<G: Scope, C> {
1246 /// Converts the type to a differential dataflow collection.
1247 fn as_collection(self) -> Collection<G, C>;
1248}
1249
1250impl<G: Scope, C> AsCollection<G, C> for Stream<G, C> {
1251 /// Converts the type to a differential dataflow collection.
1252 ///
1253 /// By calling this method, you guarantee that the timestamp invariant (as documented on
1254 /// [Collection]) is upheld. This method will not check it.
1255 fn as_collection(self) -> Collection<G, C> {
1256 Collection::<G,C>::new(self)
1257 }
1258}
1259
1260/// Concatenates multiple collections.
1261///
1262/// This method has the effect of a sequence of calls to `concat`, but it does
1263/// so in one operator rather than a chain of many operators.
1264///
1265/// # Examples
1266///
1267/// ```
1268/// use differential_dataflow::input::Input;
1269///
1270/// ::timely::example(|scope| {
1271///
1272/// let data = scope.new_collection_from(1 .. 10).1;
1273///
1274/// let odds = data.clone().filter(|x| x % 2 == 1);
1275/// let evens = data.clone().filter(|x| x % 2 == 0);
1276///
1277/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
1278/// .assert_eq(data);
1279/// });
1280/// ```
1281pub fn concatenate<G, C, I>(scope: &mut G, iterator: I) -> Collection<G, C>
1282where
1283 G: Scope,
1284 C: Container,
1285 I: IntoIterator<Item=Collection<G, C>>,
1286{
1287 scope
1288 .concatenate(iterator.into_iter().map(|x| x.inner))
1289 .as_collection()
1290}
1291
1292/// Traits that can be implemented by containers to provide functionality to collections based on them.
1293pub mod containers {
1294
1295 /// A container that can negate its updates.
1296 pub trait Negate {
1297 /// Negates Abelian differences of each update.
1298 fn negate(self) -> Self;
1299 }
1300
1301 /// A container that can enter from timestamp `T1` into timestamp `T2`.
1302 pub trait Enter<T1, T2> {
1303 /// The resulting container type.
1304 type InnerContainer;
1305 /// Update timestamps from `T1` to `T2`.
1306 fn enter(self) -> Self::InnerContainer;
1307 }
1308
1309 /// A container that can leave from timestamp `T1` into timestamp `T2`.
1310 pub trait Leave<T1, T2> {
1311 /// The resulting container type.
1312 type OuterContainer;
1313 /// Update timestamps from `T1` to `T2`.
1314 fn leave(self) -> Self::OuterContainer;
1315 }
1316
1317 /// A container that can advance timestamps by a summary `TS`.
1318 pub trait ResultsIn<TS> {
1319 /// Advance times in the container by `step`.
1320 fn results_in(self, step: &TS) -> Self;
1321 }
1322
1323
1324 /// Implementations of container traits for the `Vec` container.
1325 mod vec {
1326
1327 use timely::progress::{Timestamp, timestamp::Refines};
1328 use crate::collection::Abelian;
1329
1330 use super::{Negate, Enter, Leave, ResultsIn};
1331
1332 impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
1333 fn negate(mut self) -> Self {
1334 for (_data, _time, diff) in self.iter_mut() { diff.negate(); }
1335 self
1336 }
1337 }
1338
1339 impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
1340 type InnerContainer = Vec<(D, T2, R)>;
1341 fn enter(self) -> Self::InnerContainer {
1342 self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
1343 }
1344 }
1345
1346 impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
1347 type OuterContainer = Vec<(D, T2, R)>;
1348 fn leave(self) -> Self::OuterContainer {
1349 self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
1350 }
1351 }
1352
1353 impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
1354 fn results_in(self, step: &T::Summary) -> Self {
1355 use timely::progress::PathSummary;
1356 self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
1357 }
1358 }
1359 }
1360
1361 /// Implementations of container traits for the `Rc` container.
1362 mod rc {
1363 use std::rc::Rc;
1364
1365 use timely::progress::{Timestamp, timestamp::Refines};
1366
1367 use super::{Negate, Enter, Leave, ResultsIn};
1368
1369 impl<C: Negate+Clone+Default> Negate for Rc<C> {
1370 fn negate(mut self) -> Self {
1371 std::mem::take(Rc::make_mut(&mut self)).negate().into()
1372 }
1373 }
1374
1375 impl<C: Enter<T1, T2>+Clone+Default, T1: Timestamp, T2: Refines<T1>> Enter<T1, T2> for Rc<C> {
1376 type InnerContainer = Rc<C::InnerContainer>;
1377 fn enter(mut self) -> Self::InnerContainer {
1378 std::mem::take(Rc::make_mut(&mut self)).enter().into()
1379 }
1380 }
1381
1382 impl<C: Leave<T1, T2>+Clone+Default, T1: Refines<T2>, T2: Timestamp> Leave<T1, T2> for Rc<C> {
1383 type OuterContainer = Rc<C::OuterContainer>;
1384 fn leave(mut self) -> Self::OuterContainer {
1385 std::mem::take(Rc::make_mut(&mut self)).leave().into()
1386 }
1387 }
1388
1389 impl<C: ResultsIn<TS>+Clone+Default, TS> ResultsIn<TS> for Rc<C> {
1390 fn results_in(mut self, step: &TS) -> Self {
1391 std::mem::take(Rc::make_mut(&mut self)).results_in(step).into()
1392 }
1393 }
1394 }
1395}