differential_dataflow/collection.rs
1//! Types and traits associated with collections of data.
2//!
3//! The `Collection` type is differential dataflow's core abstraction for an updatable pile of data.
4//!
5//! Most differential dataflow programs are "collection-oriented", in the sense that they transform
6//! one collection into another, using operators defined on collections. This contrasts with a more
7//! imperative programming style, in which one might iterate through the contents of a collection
8//! manually. The higher-level of programming allows differential dataflow to provide efficient
9//! implementations, and to support efficient incremental updates to the collections.
10
11use timely::Container;
12use timely::progress::Timestamp;
13use timely::dataflow::{Scope, Stream};
14use timely::dataflow::operators::*;
15
16use crate::difference::Abelian;
17
18/// An evolving collection represented by a stream of abstract containers.
19///
20/// The containers purport to reperesent changes to a collection, and they must implement various traits
21/// in order to expose some of this functionality (e.g. negation, timestamp manipulation). Other actions
22/// on the containers, and streams of containers, are left to the container implementor to describe.
23#[derive(Clone)]
24pub struct Collection<'scope, T: Timestamp, C: 'static> {
25 /// The underlying timely dataflow stream.
26 ///
27 /// This field is exposed to support direct timely dataflow manipulation when required, but it is
28 /// not intended to be the idiomatic way to work with the collection.
29 ///
30 /// The timestamp in the data is required to always be at least the timestamp _of_ the data, in
31 /// the timely-dataflow sense. If this invariant is not upheld, differential operators may behave
32 /// unexpectedly.
33 pub inner: Stream<'scope, T, C>,
34}
35
36impl<'scope, T: Timestamp, C> Collection<'scope, T, C> {
37 /// Creates a new Collection from a timely dataflow stream.
38 ///
39 /// This method seems to be rarely used, with the `as_collection` method on streams being a more
40 /// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
41 /// provides a `new_collection` method which will create a new collection for you without exposing
42 /// the underlying timely stream at all.
43 ///
44 /// This stream should satisfy the timestamp invariant as documented on [Collection]; this
45 /// method does not check it.
46 pub fn new(stream: Stream<'scope, T, C>) -> Self { Self { inner: stream } }
47}
48impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C> {
49 /// Creates a new collection accumulating the contents of the two collections.
50 ///
51 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
52 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
53 /// two collections.
54 ///
55 /// # Examples
56 ///
57 /// ```
58 /// use differential_dataflow::input::Input;
59 ///
60 /// ::timely::example(|scope| {
61 ///
62 /// let data = scope.new_collection_from(1 .. 10).1;
63 ///
64 /// let odds = data.clone().filter(|x| x % 2 == 1);
65 /// let evens = data.clone().filter(|x| x % 2 == 0);
66 ///
67 /// odds.concat(evens)
68 /// .assert_eq(data);
69 /// });
70 /// ```
71 pub fn concat(self, other: Self) -> Self {
72 self.inner
73 .concat(other.inner)
74 .as_collection()
75 }
76 /// Creates a new collection accumulating the contents of the two collections.
77 ///
78 /// Despite the name, differential dataflow collections are unordered. This method is so named because the
79 /// implementation is the concatenation of the stream of updates, but it corresponds to the addition of the
80 /// two collections.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// use differential_dataflow::input::Input;
86 ///
87 /// ::timely::example(|scope| {
88 ///
89 /// let data = scope.new_collection_from(1 .. 10).1;
90 ///
91 /// let odds = data.clone().filter(|x| x % 2 == 1);
92 /// let evens = data.clone().filter(|x| x % 2 == 0);
93 ///
94 /// odds.concatenate(Some(evens))
95 /// .assert_eq(data);
96 /// });
97 /// ```
98 pub fn concatenate<I>(self, sources: I) -> Self
99 where
100 I: IntoIterator<Item=Self>
101 {
102 self.inner
103 .scope()
104 .concatenate(sources.into_iter().map(|x| x.inner).chain([self.inner]))
105 .as_collection()
106 }
107 // Brings a Collection into a nested region.
108 ///
109 /// This method is a specialization of `enter` to the case where the nested scope is a region.
110 /// It removes the need for an operator that adjusts the timestamp.
111 pub fn enter_region<'inner>(self, child: Scope<'inner, T>) -> Collection<'inner, T, C> {
112 self.inner
113 .enter(child)
114 .as_collection()
115 }
116 /// Applies a supplied function to each batch of updates.
117 ///
118 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
119 /// timely dataflow capability associated with the batch of updates. The observed batching depends
120 /// on how the system executes, and may vary run to run.
121 ///
122 /// # Examples
123 ///
124 /// ```
125 /// use differential_dataflow::input::Input;
126 ///
127 /// ::timely::example(|scope| {
128 /// scope.new_collection_from(1 .. 10).1
129 /// .map_in_place(|x| *x *= 2)
130 /// .filter(|x| x % 2 == 1)
131 /// .inspect_container(|event| println!("event: {:?}", event));
132 /// });
133 /// ```
134 pub fn inspect_container<F>(self, func: F) -> Self
135 where
136 F: FnMut(Result<(&T, &C), &[T]>)+'static,
137 {
138 self.inner
139 .inspect_container(func)
140 .as_collection()
141 }
142 /// Attaches a timely dataflow probe to the output of a Collection.
143 ///
144 /// This probe is used to determine when the state of the Collection has stabilized and can
145 /// be read out.
146 pub fn probe(self) -> (probe::Handle<T>, Self) {
147 let (handle, stream) = self.inner.probe();
148 (handle, stream.as_collection())
149 }
150 /// Attaches a timely dataflow probe to the output of a Collection.
151 ///
152 /// This probe is used to determine when the state of the Collection has stabilized and all updates observed.
153 /// In addition, a probe is also often use to limit the number of rounds of input in flight at any moment; a
154 /// computation can wait until the probe has caught up to the input before introducing more rounds of data, to
155 /// avoid swamping the system.
156 pub fn probe_with(self, handle: &probe::Handle<T>) -> Self {
157 Self::new(self.inner.probe_with(handle))
158 }
159 /// The scope containing the underlying timely dataflow stream.
160 pub fn scope(&self) -> Scope<'scope, T> {
161 self.inner.scope()
162 }
163
164 /// Creates a new collection whose counts are the negation of those in the input.
165 ///
166 /// This method is most commonly used with `concat` to get those element in one collection but not another.
167 /// However, differential dataflow computations are still defined for all values of the difference type `R`,
168 /// including negative counts.
169 ///
170 /// # Examples
171 ///
172 /// ```
173 /// use differential_dataflow::input::Input;
174 ///
175 /// ::timely::example(|scope| {
176 ///
177 /// let data = scope.new_collection_from(1 .. 10).1;
178 ///
179 /// let odds = data.clone().filter(|x| x % 2 == 1);
180 /// let evens = data.clone().filter(|x| x % 2 == 0);
181 ///
182 /// odds.negate()
183 /// .concat(data)
184 /// .assert_eq(evens);
185 /// });
186 /// ```
187 pub fn negate(self) -> Self where C: containers::Negate {
188 use timely::dataflow::channels::pact::Pipeline;
189 self.inner
190 .unary(Pipeline, "Negate", move |_,_| move |input, output| {
191 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).negate()));
192 })
193 .as_collection()
194 }
195
196 /// Brings a Collection into a nested scope.
197 ///
198 /// # Examples
199 ///
200 /// ```
201 /// use timely::dataflow::Scope;
202 /// use differential_dataflow::input::Input;
203 ///
204 /// ::timely::example(|scope| {
205 ///
206 /// let data = scope.new_collection_from(1 .. 10).1;
207 ///
208 /// let result = scope.region(|child| {
209 /// data.clone()
210 /// .enter(child)
211 /// .leave(scope)
212 /// });
213 ///
214 /// data.assert_eq(result);
215 /// });
216 /// ```
217 pub fn enter<'inner, TInner>(self, child: Scope<'inner, TInner>) -> Collection<'inner, TInner, <C as containers::Enter<T, TInner>>::InnerContainer>
218 where
219 C: containers::Enter<T, TInner, InnerContainer: Container>,
220 TInner: Refines<T>,
221 {
222 use timely::dataflow::channels::pact::Pipeline;
223 self.inner
224 .enter(child)
225 .unary(Pipeline, "Enter", move |_,_| move |input, output| {
226 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).enter()));
227 })
228 .as_collection()
229 }
230
231 /// Advances a timestamp in the stream according to the timestamp actions on the path.
232 ///
233 /// The path may advance the timestamp sufficiently that it is no longer valid, for example if
234 /// incrementing fields would result in integer overflow. In this case, the record is dropped.
235 ///
236 /// # Examples
237 /// ```
238 /// use timely::dataflow::Scope;
239 /// use timely::dataflow::operators::{ToStream, Concat, Inspect, vec::BranchWhen};
240 ///
241 /// use differential_dataflow::input::Input;
242 ///
243 /// timely::example(|scope| {
244 /// let summary1 = 5;
245 ///
246 /// let data = scope.new_collection_from(1 .. 10).1;
247 /// /// Applies `results_in` on every timestamp in the collection.
248 /// data.results_in(summary1);
249 /// });
250 /// ```
251 pub fn results_in(self, step: T::Summary) -> Self
252 where
253 C: containers::ResultsIn<T::Summary>,
254 {
255 use timely::dataflow::channels::pact::Pipeline;
256 self.inner
257 .unary(Pipeline, "ResultsIn", move |_,_| move |input, output| {
258 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).results_in(&step)));
259 })
260 .as_collection()
261 }
262}
263
264use timely::progress::timestamp::Refines;
265
266/// Methods requiring a nested scope.
267impl<'scope, T: Timestamp, C: Container> Collection<'scope, T, C>
268{
269 /// Returns the final value of a Collection from a nested scope to its containing scope.
270 ///
271 /// # Examples
272 ///
273 /// ```
274 /// use timely::dataflow::Scope;
275 /// use differential_dataflow::input::Input;
276 ///
277 /// ::timely::example(|scope| {
278 ///
279 /// let data = scope.new_collection_from(1 .. 10).1;
280 ///
281 /// let result = scope.region(|child| {
282 /// data.clone()
283 /// .enter(child)
284 /// .leave(scope)
285 /// });
286 ///
287 /// data.assert_eq(result);
288 /// });
289 /// ```
290 pub fn leave<'outer, TOuter>(self, outer: Scope<'outer, TOuter>) -> Collection<'outer, TOuter, <C as containers::Leave<T, TOuter>>::OuterContainer>
291 where
292 TOuter: Timestamp,
293 T: Refines<TOuter>,
294 C: containers::Leave<T, TOuter, OuterContainer: Container>,
295 {
296 use timely::dataflow::channels::pact::Pipeline;
297 self.inner
298 .leave(outer)
299 .unary(Pipeline, "Leave", move |_,_| move |input, output| {
300 input.for_each(|time, data| output.session(&time).give_container(&mut std::mem::take(data).leave()));
301 })
302 .as_collection()
303 }
304
305 /// Returns the value of a Collection from a nested region to its containing scope.
306 ///
307 /// This method is a specialization of `leave` to the case that of a nested region.
308 /// It removes the need for an operator that adjusts the timestamp.
309 pub fn leave_region<'outer>(self, outer: Scope<'outer, T>) -> Collection<'outer, T, C> {
310 self.inner
311 .leave(outer)
312 .as_collection()
313 }
314}
315
316pub use vec::Collection as VecCollection;
317/// Specializations of `Collection` that use `Vec` as the container.
318pub mod vec {
319
320 use std::hash::Hash;
321
322 use timely::progress::Timestamp;
323 use timely::order::Product;
324 use timely::dataflow::scope::Iterative;
325 use timely::dataflow::operators::*;
326 use timely::dataflow::operators::vec::*;
327
328 use crate::collection::AsCollection;
329 use crate::difference::{Semigroup, Abelian, Multiply};
330 use crate::lattice::Lattice;
331 use crate::hashable::Hashable;
332
333 /// An evolving collection of values of type `D`, backed by Rust `Vec` types as containers.
334 ///
335 /// The `Collection` type is the core abstraction in differential dataflow programs. As you write your
336 /// differential dataflow computation, you write as if the collection is a static dataset to which you
337 /// apply functional transformations, creating new collections. Once your computation is written, you
338 /// are able to mutate the collection (by inserting and removing elements); differential dataflow will
339 /// propagate changes through your functional computation and report the corresponding changes to the
340 /// output collections.
341 ///
342 /// Each vec collection has three generic parameters. The parameter `T` is the timestamp type of the
343 /// scope in which the collection exists; as you write more complicated programs you may wish to
344 /// introduce nested scopes (e.g. for iteration), and this parameter tracks the scope's timestamp
345 /// (for timely dataflow's benefit). The `D` parameter is the type of data in your collection, for
346 /// example `String`, or `(u32, Vec<Option<()>>)`.
347 /// The `R` parameter represents the types of changes that the data undergo, and is most commonly (and
348 /// defaults to) `isize`, representing changes to the occurrence count of each record.
349 ///
350 /// This type definition instantiates the [`Collection`] type with a `Vec<(D, T, R)>`.
351 pub type Collection<'scope, T, D, R = isize> = super::Collection<'scope, T, Vec<(D, T, R)>>;
352
353
354 impl<'scope, T: Timestamp, D: Clone+'static, R: Clone+'static> Collection<'scope, T, D, R> {
355 /// Creates a new collection by applying the supplied function to each input element.
356 ///
357 /// # Examples
358 ///
359 /// ```
360 /// use differential_dataflow::input::Input;
361 ///
362 /// ::timely::example(|scope| {
363 /// scope.new_collection_from(1 .. 10).1
364 /// .map(|x| x * 2)
365 /// .filter(|x| x % 2 == 1)
366 /// .assert_empty();
367 /// });
368 /// ```
369 pub fn map<D2, L>(self, mut logic: L) -> Collection<'scope, T, D2, R>
370 where
371 D2: Clone+'static,
372 L: FnMut(D) -> D2 + 'static,
373 {
374 self.inner
375 .map(move |(data, time, delta)| (logic(data), time, delta))
376 .as_collection()
377 }
378 /// Creates a new collection by applying the supplied function to each input element.
379 ///
380 /// Although the name suggests in-place mutation, this function does not change the source collection,
381 /// but rather re-uses the underlying allocations in its implementation. The method is semantically
382 /// equivalent to `map`, but can be more efficient.
383 ///
384 /// # Examples
385 ///
386 /// ```
387 /// use differential_dataflow::input::Input;
388 ///
389 /// ::timely::example(|scope| {
390 /// scope.new_collection_from(1 .. 10).1
391 /// .map_in_place(|x| *x *= 2)
392 /// .filter(|x| x % 2 == 1)
393 /// .assert_empty();
394 /// });
395 /// ```
396 pub fn map_in_place<L>(self, mut logic: L) -> Collection<'scope, T, D, R>
397 where
398 L: FnMut(&mut D) + 'static,
399 {
400 self.inner
401 .map_in_place(move |&mut (ref mut data, _, _)| logic(data))
402 .as_collection()
403 }
404 /// Creates a new collection by applying the supplied function to each input element and accumulating the results.
405 ///
406 /// This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be
407 /// warned that if the iterators produce substantial amounts of data, they are currently fully drained before
408 /// attempting to consolidate the results.
409 ///
410 /// # Examples
411 ///
412 /// ```
413 /// use differential_dataflow::input::Input;
414 ///
415 /// ::timely::example(|scope| {
416 /// scope.new_collection_from(1 .. 10).1
417 /// .flat_map(|x| 0 .. x);
418 /// });
419 /// ```
420 pub fn flat_map<I, L>(self, mut logic: L) -> Collection<'scope, T, I::Item, R>
421 where
422 T: Clone,
423 I: IntoIterator<Item: Clone+'static>,
424 L: FnMut(D) -> I + 'static,
425 {
426 self.inner
427 .flat_map(move |(data, time, delta)| logic(data).into_iter().map(move |x| (x, time.clone(), delta.clone())))
428 .as_collection()
429 }
430 /// Creates a new collection containing those input records satisfying the supplied predicate.
431 ///
432 /// # Examples
433 ///
434 /// ```
435 /// use differential_dataflow::input::Input;
436 ///
437 /// ::timely::example(|scope| {
438 /// scope.new_collection_from(1 .. 10).1
439 /// .map(|x| x * 2)
440 /// .filter(|x| x % 2 == 1)
441 /// .assert_empty();
442 /// });
443 /// ```
444 pub fn filter<L>(self, mut logic: L) -> Collection<'scope, T, D, R>
445 where
446 L: FnMut(&D) -> bool + 'static,
447 {
448 self.inner
449 .filter(move |(data, _, _)| logic(data))
450 .as_collection()
451 }
452 /// Replaces each record with another, with a new difference type.
453 ///
454 /// This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed)
455 /// and move the data into the difference component. This will allow differential dataflow to update in-place.
456 ///
457 /// # Examples
458 ///
459 /// ```
460 /// use differential_dataflow::input::Input;
461 ///
462 /// ::timely::example(|scope| {
463 ///
464 /// let nums = scope.new_collection_from(0 .. 10).1;
465 /// let x1 = nums.clone().flat_map(|x| 0 .. x);
466 /// let x2 = nums.map(|x| (x, 9 - x))
467 /// .explode(|(x,y)| Some((x,y)));
468 ///
469 /// x1.assert_eq(x2);
470 /// });
471 /// ```
472 pub fn explode<D2, R2, I, L>(self, mut logic: L) -> Collection<'scope, T, D2, <R2 as Multiply<R>>::Output>
473 where
474 D2: Clone+'static,
475 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
476 I: IntoIterator<Item=(D2,R2)>,
477 L: FnMut(D)->I+'static,
478 {
479 self.inner
480 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,d2)| (x, t.clone(), d2.multiply(&d))))
481 .as_collection()
482 }
483
484 /// Joins each record against a collection defined by the function `logic`.
485 ///
486 /// This method performs what is essentially a join with the collection of records `(x, logic(x))`.
487 /// Rather than materialize this second relation, `logic` is applied to each record and the appropriate
488 /// modifications made to the results, namely joining timestamps and multiplying differences.
489 ///
490 /// #Examples
491 ///
492 /// ```
493 /// use differential_dataflow::input::Input;
494 ///
495 /// ::timely::example(|scope| {
496 /// // creates `x` copies of `2*x` from time `3*x` until `4*x`,
497 /// // for x from 0 through 9.
498 /// scope.new_collection_from(0 .. 10isize).1
499 /// .join_function(|x|
500 /// // data time diff
501 /// vec![(2*x, (3*x) as u64, x),
502 /// (2*x, (4*x) as u64, -x)]
503 /// );
504 /// });
505 /// ```
506 pub fn join_function<D2, R2, I, L>(self, mut logic: L) -> Collection<'scope, T, D2, <R2 as Multiply<R>>::Output>
507 where
508 T: Lattice,
509 D2: Clone+'static,
510 R2: Semigroup+Multiply<R, Output: Semigroup+'static>,
511 I: IntoIterator<Item=(D2,T,R2)>,
512 L: FnMut(D)->I+'static,
513 {
514 self.inner
515 .flat_map(move |(x, t, d)| logic(x).into_iter().map(move |(x,t2,d2)| (x, t.join(&t2), d2.multiply(&d))))
516 .as_collection()
517 }
518
519 /// Brings a Collection into a nested scope, at varying times.
520 ///
521 /// The `initial` function indicates the time at which each element of the Collection should appear.
522 ///
523 /// # Examples
524 ///
525 /// ```
526 /// use timely::dataflow::Scope;
527 /// use differential_dataflow::input::Input;
528 ///
529 /// ::timely::example(|scope| {
530 ///
531 /// let data = scope.new_collection_from(1 .. 10).1;
532 ///
533 /// let result = scope.iterative::<u64,_,_>(|child| {
534 /// data.clone()
535 /// .enter_at(child, |x| *x)
536 /// .leave(scope)
537 /// });
538 ///
539 /// data.assert_eq(result);
540 /// });
541 /// ```
542 pub fn enter_at<'inner, TInner, F>(self, child: Iterative<'inner, T, TInner>, mut initial: F) -> Collection<'inner, Product<T, TInner>, D, R>
543 where
544 TInner: Timestamp+Hash,
545 F: FnMut(&D) -> TInner + Clone + 'static,
546 {
547 self.inner
548 .enter(child)
549 .map(move |(data, time, diff)| {
550 let new_time = Product::new(time, initial(&data));
551 (data, new_time, diff)
552 })
553 .as_collection()
554 }
555
556 /// Delays each difference by a supplied function.
557 ///
558 /// It is assumed that `func` only advances timestamps; this is not verified, and things may go horribly
559 /// wrong if that assumption is incorrect. It is also critical that `func` be monotonic: if two times are
560 /// ordered, they should have the same order or compare equal once `func` is applied to them (this
561 /// is because we advance the timely capability with the same logic, and it must remain `less_equal`
562 /// to all of the data timestamps).
563 pub fn delay<F>(self, func: F) -> Collection<'scope, T, D, R>
564 where
565 T: Hash,
566 F: FnMut(&T) -> T + Clone + 'static,
567 {
568 let mut func1 = func.clone();
569 let mut func2 = func.clone();
570
571 self.inner
572 .delay_batch(move |x| func1(x))
573 .map_in_place(move |x| x.1 = func2(&x.1))
574 .as_collection()
575 }
576
577 /// Applies a supplied function to each update.
578 ///
579 /// This method is most commonly used to report information back to the user, often for debugging purposes.
580 /// Any function can be used here, but be warned that the incremental nature of differential dataflow does
581 /// not guarantee that it will be called as many times as you might expect.
582 ///
583 /// The `(data, time, diff)` triples indicate a change `diff` to the frequency of `data` which takes effect
584 /// at the logical time `time`. When times are totally ordered (for example, `usize`), these updates reflect
585 /// the changes along the sequence of collections. For partially ordered times, the mathematics are more
586 /// interesting and less intuitive, unfortunately.
587 ///
588 /// # Examples
589 ///
590 /// ```
591 /// use differential_dataflow::input::Input;
592 ///
593 /// ::timely::example(|scope| {
594 /// scope.new_collection_from(1 .. 10).1
595 /// .map_in_place(|x| *x *= 2)
596 /// .filter(|x| x % 2 == 1)
597 /// .inspect(|x| println!("error: {:?}", x));
598 /// });
599 /// ```
600 pub fn inspect<F>(self, func: F) -> Collection<'scope, T, D, R>
601 where
602 F: FnMut(&(D, T, R))+'static,
603 {
604 self.inner
605 .inspect(func)
606 .as_collection()
607 }
608 /// Applies a supplied function to each batch of updates.
609 ///
610 /// This method is analogous to `inspect`, but operates on batches and reveals the timestamp of the
611 /// timely dataflow capability associated with the batch of updates. The observed batching depends
612 /// on how the system executes, and may vary run to run.
613 ///
614 /// # Examples
615 ///
616 /// ```
617 /// use differential_dataflow::input::Input;
618 ///
619 /// ::timely::example(|scope| {
620 /// scope.new_collection_from(1 .. 10).1
621 /// .map_in_place(|x| *x *= 2)
622 /// .filter(|x| x % 2 == 1)
623 /// .inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
624 /// });
625 /// ```
626 pub fn inspect_batch<F>(self, mut func: F) -> Collection<'scope, T, D, R>
627 where
628 F: FnMut(&T, &[(D, T, R)])+'static,
629 {
630 self.inner
631 .inspect_batch(move |time, data| func(time, data))
632 .as_collection()
633 }
634
635 /// Assert if the collection is ever non-empty.
636 ///
637 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
638 /// is not run, or not run to completion, there may be un-exercised times at which the collection could be
639 /// non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a
640 /// program should indicate that this assertion never found cause to complain.
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// use differential_dataflow::input::Input;
646 ///
647 /// ::timely::example(|scope| {
648 /// scope.new_collection_from(1 .. 10).1
649 /// .map(|x| x * 2)
650 /// .filter(|x| x % 2 == 1)
651 /// .assert_empty();
652 /// });
653 /// ```
654 pub fn assert_empty(self)
655 where
656 D: crate::ExchangeData+Hashable,
657 R: crate::ExchangeData+Hashable + Semigroup,
658 T: Lattice+Ord,
659 {
660 self.consolidate()
661 .inspect(|x| panic!("Assertion failed: non-empty collection: {:?}", x));
662 }
663 }
664
665 /// Methods requiring an Abelian difference, to support negation.
666 impl<'scope, T: Timestamp + Clone + 'static, D: Clone+'static, R: Abelian+'static> Collection<'scope, T, D, R> {
667 /// Assert if the collections are ever different.
668 ///
669 /// Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation
670 /// is not run, or not run to completion, there may be un-exercised times at which the collections could vary.
671 /// Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should
672 /// indicate that this assertion never found cause to complain.
673 ///
674 /// # Examples
675 ///
676 /// ```
677 /// use differential_dataflow::input::Input;
678 ///
679 /// ::timely::example(|scope| {
680 ///
681 /// let data = scope.new_collection_from(1 .. 10).1;
682 ///
683 /// let odds = data.clone().filter(|x| x % 2 == 1);
684 /// let evens = data.clone().filter(|x| x % 2 == 0);
685 ///
686 /// odds.concat(evens)
687 /// .assert_eq(data);
688 /// });
689 /// ```
690 pub fn assert_eq(self, other: Self)
691 where
692 D: crate::ExchangeData+Hashable,
693 R: crate::ExchangeData+Hashable,
694 T: Lattice+Ord,
695 {
696 self.negate()
697 .concat(other)
698 .assert_empty();
699 }
700 }
701
702 use crate::trace::{Trace, Builder};
703 use crate::operators::arrange::{Arranged, TraceAgent};
704
705 impl <'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
706 where
707 T: Timestamp + Lattice + Ord,
708 K: crate::ExchangeData+Hashable,
709 V: crate::ExchangeData,
710 R: crate::ExchangeData+Semigroup,
711 {
712 /// Applies a reduction function on records grouped by key.
713 ///
714 /// Input data must be structured as `(key, val)` pairs.
715 /// The user-supplied reduction function takes as arguments
716 ///
717 /// 1. a reference to the key,
718 /// 2. a reference to the slice of values and their accumulated updates,
719 /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
720 ///
721 /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
722 /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
723 /// `Ord` implementations.
724 ///
725 /// # Examples
726 ///
727 /// ```
728 /// use differential_dataflow::input::Input;
729 ///
730 /// ::timely::example(|scope| {
731 /// // report the smallest value for each group
732 /// scope.new_collection_from(1 .. 10).1
733 /// .map(|x| (x / 3, x))
734 /// .reduce(|_key, input, output| {
735 /// output.push((*input[0].0, 1))
736 /// });
737 /// });
738 /// ```
739 pub fn reduce<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, logic: L) -> Collection<'scope, T, (K, V2), R2>
740 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
741 self.reduce_named("Reduce", logic)
742 }
743
744 /// As `reduce` with the ability to name the operator.
745 pub fn reduce_named<L, V2: crate::Data, R2: Ord+Abelian+'static>(self, name: &str, logic: L) -> Collection<'scope, T, (K, V2), R2>
746 where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
747 use crate::trace::implementations::{ValBuilder, ValSpine};
748
749 self.arrange_by_key_named(&format!("Arrange: {}", name))
750 .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<K,V2,_,_>,_>(
751 name,
752 logic,
753 |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
754 )
755 .as_collection(|k,v| (k.clone(), v.clone()))
756 }
757
758 /// Applies `reduce` to arranged data, and returns an arrangement of output data.
759 ///
760 /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
761 /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
762 ///
763 /// # Examples
764 ///
765 /// ```
766 /// use differential_dataflow::input::Input;
767 /// use differential_dataflow::trace::Trace;
768 /// use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
769 ///
770 /// ::timely::example(|scope| {
771 ///
772 /// let trace =
773 /// scope.new_collection_from(1 .. 10u32).1
774 /// .map(|x| (x, x))
775 /// .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
776 /// "Example",
777 /// move |_key, src, dst| dst.push((*src[0].0, 1))
778 /// )
779 /// .trace;
780 /// });
781 /// ```
782 pub fn reduce_abelian<L, Bu, T2>(self, name: &str, mut logic: L) -> Arranged<'scope, TraceAgent<T2>>
783 where
784 T2: for<'a> Trace<Key<'a>= &'a K, ValOwn = V, Time=T, Diff: Abelian>+'static,
785 Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
786 L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static,
787 {
788 self.reduce_core::<_,Bu,T2>(name, move |key, input, output, change| {
789 if !input.is_empty() { logic(key, input, change); }
790 change.extend(output.drain(..).map(|(x,mut d)| { d.negate(); (x, d) }));
791 crate::consolidation::consolidate(change);
792 })
793 }
794
795 /// Solves for output updates when presented with inputs and would-be outputs.
796 ///
797 /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
798 /// and it may not be safe to index into the first element.
799 /// At least one of the two collections will be non-empty.
800 pub fn reduce_core<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
801 where
802 V: Clone+'static,
803 T2: for<'a> Trace<Key<'a>=&'a K, ValOwn = V, Time=T>+'static,
804 Bu: Builder<Time=T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
805 L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static,
806 {
807 self.arrange_by_key_named(&format!("Arrange: {}", name))
808 .reduce_core::<_,Bu,_,_>(
809 name,
810 logic,
811 |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
812 )
813 }
814 }
815
816 impl<'scope, T, K, R1> Collection<'scope, T, K, R1>
817 where
818 T: Timestamp + Lattice + Ord,
819 K: crate::ExchangeData+Hashable,
820 R1: crate::ExchangeData+Semigroup
821 {
822
823 /// Reduces the collection to one occurrence of each distinct element.
824 ///
825 /// # Examples
826 ///
827 /// ```
828 /// use differential_dataflow::input::Input;
829 ///
830 /// ::timely::example(|scope| {
831 /// // report at most one of each key.
832 /// scope.new_collection_from(1 .. 10).1
833 /// .map(|x| x / 3)
834 /// .distinct();
835 /// });
836 /// ```
837 pub fn distinct(self) -> Collection<'scope, T, K, isize> {
838 self.distinct_core()
839 }
840
841 /// Distinct for general integer differences.
842 ///
843 /// This method allows `distinct` to produce collections whose difference
844 /// type is something other than an `isize` integer, for example perhaps an
845 /// `i32`.
846 pub fn distinct_core<R2: Ord+Abelian+'static+From<i8>>(self) -> Collection<'scope, T, K, R2> {
847 self.threshold_named("Distinct", |_,_| R2::from(1i8))
848 }
849
850 /// Transforms the multiplicity of records.
851 ///
852 /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
853 /// least the computation may behave as if it does. Otherwise, the transformation
854 /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
855 ///
856 /// # Examples
857 ///
858 /// ```
859 /// use differential_dataflow::input::Input;
860 ///
861 /// ::timely::example(|scope| {
862 /// // report at most one of each key.
863 /// scope.new_collection_from(1 .. 10).1
864 /// .map(|x| x / 3)
865 /// .threshold(|_,c| c % 2);
866 /// });
867 /// ```
868 pub fn threshold<R2: Ord+Abelian+'static, F: FnMut(&K, &R1)->R2+'static>(self, thresh: F) -> Collection<'scope, T, K, R2> {
869 self.threshold_named("Threshold", thresh)
870 }
871
872 /// A `threshold` with the ability to name the operator.
873 pub fn threshold_named<R2: Ord+Abelian+'static, F: FnMut(&K,&R1)->R2+'static>(self, name: &str, mut thresh: F) -> Collection<'scope, T, K, R2> {
874 use crate::trace::implementations::{KeyBuilder, KeySpine};
875
876 self.arrange_by_self_named(&format!("Arrange: {}", name))
877 .reduce_abelian::<_,KeyBuilder<K,T,R2>,KeySpine<K,T,R2>,_>(
878 name,
879 move |k,s,t| t.push(((), thresh(k, &s[0].1))),
880 |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
881 )
882 .as_collection(|k,_| k.clone())
883 }
884
885 }
886
887 impl<'scope, T, K, R> Collection<'scope, T, K, R>
888 where
889 T: Timestamp + Lattice + Ord,
890 K: crate::ExchangeData+Hashable,
891 R: crate::ExchangeData+Semigroup
892 {
893
894 /// Counts the number of occurrences of each element.
895 ///
896 /// # Examples
897 ///
898 /// ```
899 /// use differential_dataflow::input::Input;
900 ///
901 /// ::timely::example(|scope| {
902 /// // report the number of occurrences of each key
903 /// scope.new_collection_from(1 .. 10).1
904 /// .map(|x| x / 3)
905 /// .count();
906 /// });
907 /// ```
908 pub fn count(self) -> Collection<'scope, T, (K, R), isize> { self.count_core() }
909
910 /// Count for general integer differences.
911 ///
912 /// This method allows `count` to produce collections whose difference
913 /// type is something other than an `isize` integer, for example perhaps an
914 /// `i32`.
915 pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(self) -> Collection<'scope, T, (K, R), R2> {
916 use crate::trace::implementations::{ValBuilder, ValSpine};
917 self.arrange_by_self_named("Arrange: Count")
918 .reduce_abelian::<_,ValBuilder<K,R,T,R2>,ValSpine<K,R,T,R2>,_>(
919 "Count",
920 |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))),
921 |vec, key, upds| { vec.clear(); vec.extend(upds.drain(..).map(|(v,t,r)| ((key.clone(), v),t,r))); },
922 )
923 .as_collection(|k,c| (k.clone(), c.clone()))
924 }
925 }
926
927 /// Methods which require data be arrangeable.
928 impl<'scope, T, D, R> Collection<'scope, T, D, R>
929 where
930 T: Timestamp + Clone + 'static + Lattice,
931 D: crate::ExchangeData+Hashable,
932 R: crate::ExchangeData+Semigroup,
933 {
934 /// Aggregates the weights of equal records into at most one record.
935 ///
936 /// This method uses the type `D`'s `hashed()` method to partition the data. The data are
937 /// accumulated in place, each held back until their timestamp has completed.
938 ///
939 /// # Examples
940 ///
941 /// ```
942 /// use differential_dataflow::input::Input;
943 ///
944 /// ::timely::example(|scope| {
945 ///
946 /// let x = scope.new_collection_from(1 .. 10u32).1;
947 ///
948 /// x.clone()
949 /// .negate()
950 /// .concat(x)
951 /// .consolidate() // <-- ensures cancellation occurs
952 /// .assert_empty();
953 /// });
954 /// ```
955 pub fn consolidate(self) -> Self {
956 use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine};
957 self.consolidate_named::<KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone())
958 }
959
960 /// As `consolidate` but with the ability to name the operator, specify the trace type,
961 /// and provide the function `reify` to produce owned keys and values..
962 pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
963 where
964 Ba: crate::trace::Batcher<Input=Vec<((D,()),T,R)>, Time=T> + 'static,
965 Tr: for<'a> crate::trace::Trace<Time=T,Diff=R>+'static,
966 Bu: crate::trace::Builder<Time=Tr::Time, Input=Ba::Output, Output=Tr::Batch>,
967 F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static,
968 {
969 use crate::operators::arrange::arrangement::Arrange;
970 self.map(|k| (k, ()))
971 .arrange_named::<Ba, Bu, Tr>(name)
972 .as_collection(reify)
973 }
974
975 /// Aggregates the weights of equal records.
976 ///
977 /// Unlike `consolidate`, this method does not exchange data and does not
978 /// ensure that at most one copy of each `(data, time)` pair exists in the
979 /// results. Instead, it acts on each batch of data and collapses equivalent
980 /// `(data, time)` pairs found therein, suppressing any that accumulate to
981 /// zero.
982 ///
983 /// # Examples
984 ///
985 /// ```
986 /// use differential_dataflow::input::Input;
987 ///
988 /// ::timely::example(|scope| {
989 ///
990 /// let x = scope.new_collection_from(1 .. 10u32).1;
991 ///
992 /// // nothing to assert, as no particular guarantees.
993 /// x.clone()
994 /// .negate()
995 /// .concat(x)
996 /// .consolidate_stream();
997 /// });
998 /// ```
999 pub fn consolidate_stream(self) -> Self {
1000
1001 use timely::dataflow::channels::pact::Pipeline;
1002 use timely::dataflow::operators::Operator;
1003 use crate::collection::AsCollection;
1004 use crate::consolidation::ConsolidatingContainerBuilder;
1005
1006 self.inner
1007 .unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {
1008
1009 move |input, output| {
1010 input.for_each(|time, data| {
1011 output.session_with_builder(&time).give_iterator(data.drain(..));
1012 })
1013 }
1014 })
1015 .as_collection()
1016 }
1017 }
1018
1019 use crate::trace::implementations::{ValSpine, ValBatcher, ValBuilder};
1020 use crate::trace::implementations::{KeySpine, KeyBatcher, KeyBuilder};
1021 use crate::operators::arrange::Arrange;
1022
1023 impl<'scope, T, K, V, R> Arrange<'scope, T, Vec<((K, V), T, R)>> for Collection<'scope, T, (K, V), R>
1024 where
1025 T: Timestamp + Lattice,
1026 K: crate::ExchangeData + Hashable,
1027 V: crate::ExchangeData,
1028 R: crate::ExchangeData + Semigroup,
1029 {
1030 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
1031 where
1032 Ba: crate::trace::Batcher<Input=Vec<((K, V), T, R)>, Time=T> + 'static,
1033 Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
1034 Tr: crate::trace::Trace<Time=T> + 'static,
1035 {
1036 let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,V),T,R)| (update.0).0.hashed().into());
1037 crate::operators::arrange::arrangement::arrange_core::<_, Ba, Bu, _>(self.inner, exchange, name)
1038 }
1039 }
1040
1041 impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Arrange<'scope, T, Vec<((K, ()), T, R)>> for Collection<'scope, T, K, R>
1042 where
1043 T: Timestamp + Lattice + Ord,
1044 {
1045 fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<'scope, TraceAgent<Tr>>
1046 where
1047 Ba: crate::trace::Batcher<Input=Vec<((K,()),T,R)>, Time=T> + 'static,
1048 Bu: crate::trace::Builder<Time=T, Input=Ba::Output, Output = Tr::Batch>,
1049 Tr: crate::trace::Trace<Time=T> + 'static,
1050 {
1051 let exchange = timely::dataflow::channels::pact::Exchange::new(move |update: &((K,()),T,R)| (update.0).0.hashed().into());
1052 crate::operators::arrange::arrangement::arrange_core::<_,Ba,Bu,_>(self.map(|k| (k, ())).inner, exchange, name)
1053 }
1054 }
1055
1056
1057 impl<'scope, T, K: crate::ExchangeData+Hashable, V: crate::ExchangeData, R: crate::ExchangeData+Semigroup> Collection<'scope, T, (K,V), R>
1058 where
1059 T: Timestamp + Lattice + Ord,
1060 {
1061 /// Arranges a collection of `(Key, Val)` records by `Key`.
1062 ///
1063 /// This operator arranges a stream of values into a shared trace, whose contents it maintains.
1064 /// This trace is current for all times completed by the output stream, which can be used to
1065 /// safely identify the stable times and values in the trace.
1066 pub fn arrange_by_key(self) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
1067 self.arrange_by_key_named("ArrangeByKey")
1068 }
1069
1070 /// As `arrange_by_key` but with the ability to name the arrangement.
1071 pub fn arrange_by_key_named(self, name: &str) -> Arranged<'scope, TraceAgent<ValSpine<K, V, T, R>>> {
1072 self.arrange_named::<ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name)
1073 }
1074 }
1075
1076 impl<'scope, T, K: crate::ExchangeData+Hashable, R: crate::ExchangeData+Semigroup> Collection<'scope, T, K, R>
1077 where
1078 T: Timestamp + Lattice + Ord,
1079 {
1080 /// Arranges a collection of `Key` records by `Key`.
1081 ///
1082 /// This operator arranges a collection of records into a shared trace, whose contents it maintains.
1083 /// This trace is current for all times complete in the output stream, which can be used to safely
1084 /// identify the stable times and values in the trace.
1085 pub fn arrange_by_self(self) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
1086 self.arrange_by_self_named("ArrangeBySelf")
1087 }
1088
1089 /// As `arrange_by_self` but with the ability to name the arrangement.
1090 pub fn arrange_by_self_named(self, name: &str) -> Arranged<'scope, TraceAgent<KeySpine<K, T, R>>> {
1091 self.map(|k| (k, ()))
1092 .arrange_named::<KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name)
1093 }
1094 }
1095
1096 impl<'scope, T, K, V, R> Collection<'scope, T, (K, V), R>
1097 where
1098 T: Timestamp + Lattice + Ord,
1099 K: crate::ExchangeData+Hashable,
1100 V: crate::ExchangeData,
1101 R: crate::ExchangeData+Semigroup,
1102 {
1103 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
1104 ///
1105 /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
1106 ///
1107 /// # Examples
1108 ///
1109 /// ```
1110 /// use differential_dataflow::input::Input;
1111 ///
1112 /// ::timely::example(|scope| {
1113 ///
1114 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1115 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1116 /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
1117 ///
1118 /// x.join(y)
1119 /// .assert_eq(z);
1120 /// });
1121 /// ```
1122 pub fn join<V2, R2>(self, other: Collection<'scope, T, (K,V2), R2>) -> Collection<'scope, T, (K,(V,V2)), <R as Multiply<R2>>::Output>
1123 where
1124 K: crate::ExchangeData,
1125 V2: crate::ExchangeData,
1126 R2: crate::ExchangeData+Semigroup,
1127 R: Multiply<R2, Output: Semigroup+'static>,
1128 {
1129 self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
1130 }
1131
1132 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
1133 ///
1134 /// # Examples
1135 ///
1136 /// ```
1137 /// use differential_dataflow::input::Input;
1138 ///
1139 /// ::timely::example(|scope| {
1140 ///
1141 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1142 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
1143 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1144 ///
1145 /// x.join_map(y, |_key, &a, &b| (a,b))
1146 /// .assert_eq(z);
1147 /// });
1148 /// ```
1149 pub fn join_map<V2: crate::ExchangeData, R2: crate::ExchangeData+Semigroup, D: crate::Data, L>(self, other: Collection<'scope, T, (K, V2), R2>, mut logic: L) -> Collection<'scope, T, D, <R as Multiply<R2>>::Output>
1150 where R: Multiply<R2, Output: Semigroup+'static>, L: FnMut(&K, &V, &V2)->D+'static {
1151 let arranged1 = self.arrange_by_key();
1152 let arranged2 = other.arrange_by_key();
1153 arranged1.join_core(arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
1154 }
1155
1156 /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
1157 ///
1158 /// When the second collection contains frequencies that are either zero or one this is the more traditional
1159 /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
1160 /// the counts of the records in the first input.
1161 ///
1162 /// # Examples
1163 ///
1164 /// ```
1165 /// use differential_dataflow::input::Input;
1166 ///
1167 /// ::timely::example(|scope| {
1168 ///
1169 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1170 /// let y = scope.new_collection_from(vec![0, 2]).1;
1171 /// let z = scope.new_collection_from(vec![(0, 1)]).1;
1172 ///
1173 /// x.semijoin(y)
1174 /// .assert_eq(z);
1175 /// });
1176 /// ```
1177 pub fn semijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), <R as Multiply<R2>>::Output>
1178 where R: Multiply<R2, Output: Semigroup+'static> {
1179 let arranged1 = self.arrange_by_key();
1180 let arranged2 = other.arrange_by_self();
1181 arranged1.join_core(arranged2, |k,v,_| Some((k.clone(), v.clone())))
1182 }
1183
1184 /// Subtracts the semijoin with `other` from `self`.
1185 ///
1186 /// In the case that `other` has multiplicities zero or one this results
1187 /// in a relational antijoin, in which we discard input records whose key
1188 /// is present in `other`. If the multiplicities could be other than zero
1189 /// or one, the semantic interpretation of this operator is less clear.
1190 ///
1191 /// In almost all cases, you should ensure that `other` has multiplicities
1192 /// that are zero or one, perhaps by using the `distinct` operator.
1193 ///
1194 /// # Examples
1195 ///
1196 /// ```
1197 /// use differential_dataflow::input::Input;
1198 ///
1199 /// ::timely::example(|scope| {
1200 ///
1201 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
1202 /// let y = scope.new_collection_from(vec![0, 2]).1;
1203 /// let z = scope.new_collection_from(vec![(1, 3)]).1;
1204 ///
1205 /// x.antijoin(y)
1206 /// .assert_eq(z);
1207 /// });
1208 /// ```
1209 pub fn antijoin<R2: crate::ExchangeData+Semigroup>(self, other: Collection<'scope, T, K, R2>) -> Collection<'scope, T, (K, V), R>
1210 where R: Multiply<R2, Output=R>, R: Abelian+'static {
1211 self.clone().concat(self.semijoin(other).negate())
1212 }
1213
1214 /// Joins two arranged collections with the same key type.
1215 ///
1216 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
1217 /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
1218 /// every value returned by the iterator.
1219 ///
1220 /// # Examples
1221 ///
1222 /// ```
1223 /// use differential_dataflow::input::Input;
1224 /// use differential_dataflow::trace::Trace;
1225 ///
1226 /// ::timely::example(|scope| {
1227 ///
1228 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
1229 /// .arrange_by_key();
1230 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
1231 /// .arrange_by_key();
1232 ///
1233 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
1234 ///
1235 /// x.join_core(y, |_key, &a, &b| Some((a, b)))
1236 /// .assert_eq(z);
1237 /// });
1238 /// ```
1239 pub fn join_core<Tr2,I,L> (self, stream2: Arranged<'scope, Tr2>, result: L) -> Collection<'scope, T,I::Item,<R as Multiply<Tr2::Diff>>::Output>
1240 where
1241 Tr2: for<'a> crate::trace::TraceReader<Key<'a>=&'a K, Time=T>+Clone+'static,
1242 R: Multiply<Tr2::Diff, Output: Semigroup+'static>,
1243 I: IntoIterator<Item: crate::Data>,
1244 L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
1245 {
1246 self.arrange_by_key()
1247 .join_core(stream2, result)
1248 }
1249 }
1250}
1251
1252/// Conversion to a differential dataflow Collection.
1253pub trait AsCollection<'scope, T: Timestamp, C> {
1254 /// Converts the type to a differential dataflow collection.
1255 fn as_collection(self) -> Collection<'scope, T, C>;
1256}
1257
1258impl<'scope, T: Timestamp, C> AsCollection<'scope, T, C> for Stream<'scope, T, C> {
1259 /// Converts the type to a differential dataflow collection.
1260 ///
1261 /// By calling this method, you guarantee that the timestamp invariant (as documented on
1262 /// [Collection]) is upheld. This method will not check it.
1263 fn as_collection(self) -> Collection<'scope, T, C> {
1264 Collection::<T,C>::new(self)
1265 }
1266}
1267
1268/// Concatenates multiple collections.
1269///
1270/// This method has the effect of a sequence of calls to `concat`, but it does
1271/// so in one operator rather than a chain of many operators.
1272///
1273/// # Examples
1274///
1275/// ```
1276/// use differential_dataflow::input::Input;
1277///
1278/// ::timely::example(|scope| {
1279///
1280/// let data = scope.new_collection_from(1 .. 10).1;
1281///
1282/// let odds = data.clone().filter(|x| x % 2 == 1);
1283/// let evens = data.clone().filter(|x| x % 2 == 0);
1284///
1285/// differential_dataflow::collection::concatenate(scope, vec![odds, evens])
1286/// .assert_eq(data);
1287/// });
1288/// ```
1289pub fn concatenate<'scope, T, C, I>(scope: Scope<'scope, T>, iterator: I) -> Collection<'scope, T, C>
1290where
1291 T: Timestamp,
1292 C: Container,
1293 I: IntoIterator<Item=Collection<'scope, T, C>>,
1294{
1295 scope
1296 .concatenate(iterator.into_iter().map(|x| x.inner))
1297 .as_collection()
1298}
1299
1300/// Traits that can be implemented by containers to provide functionality to collections based on them.
1301pub mod containers {
1302
1303 /// A container that can negate its updates.
1304 pub trait Negate {
1305 /// Negates Abelian differences of each update.
1306 fn negate(self) -> Self;
1307 }
1308
1309 /// A container that can enter from timestamp `T1` into timestamp `T2`.
1310 pub trait Enter<T1, T2> {
1311 /// The resulting container type.
1312 type InnerContainer;
1313 /// Update timestamps from `T1` to `T2`.
1314 fn enter(self) -> Self::InnerContainer;
1315 }
1316
1317 /// A container that can leave from timestamp `T1` into timestamp `T2`.
1318 pub trait Leave<T1, T2> {
1319 /// The resulting container type.
1320 type OuterContainer;
1321 /// Update timestamps from `T1` to `T2`.
1322 fn leave(self) -> Self::OuterContainer;
1323 }
1324
1325 /// A container that can advance timestamps by a summary `TS`.
1326 pub trait ResultsIn<TS> {
1327 /// Advance times in the container by `step`.
1328 fn results_in(self, step: &TS) -> Self;
1329 }
1330
1331
1332 /// Implementations of container traits for the `Vec` container.
1333 mod vec {
1334
1335 use timely::progress::{Timestamp, timestamp::Refines};
1336 use crate::collection::Abelian;
1337
1338 use super::{Negate, Enter, Leave, ResultsIn};
1339
1340 impl<D, T, R: Abelian> Negate for Vec<(D, T, R)> {
1341 fn negate(mut self) -> Self {
1342 for (_data, _time, diff) in self.iter_mut() { diff.negate(); }
1343 self
1344 }
1345 }
1346
1347 impl<D, T1: Timestamp, T2: Refines<T1>, R> Enter<T1, T2> for Vec<(D, T1, R)> {
1348 type InnerContainer = Vec<(D, T2, R)>;
1349 fn enter(self) -> Self::InnerContainer {
1350 self.into_iter().map(|(d,t1,r)| (d,T2::to_inner(t1),r)).collect()
1351 }
1352 }
1353
1354 impl<D, T1: Refines<T2>, T2: Timestamp, R> Leave<T1, T2> for Vec<(D, T1, R)> {
1355 type OuterContainer = Vec<(D, T2, R)>;
1356 fn leave(self) -> Self::OuterContainer {
1357 self.into_iter().map(|(d,t1,r)| (d,t1.to_outer(),r)).collect()
1358 }
1359 }
1360
1361 impl<D, T: Timestamp, R> ResultsIn<T::Summary> for Vec<(D, T, R)> {
1362 fn results_in(self, step: &T::Summary) -> Self {
1363 use timely::progress::PathSummary;
1364 self.into_iter().filter_map(move |(d,t,r)| step.results_in(&t).map(|t| (d,t,r))).collect()
1365 }
1366 }
1367 }
1368
1369 /// Implementations of container traits for the `Rc` container.
1370 mod rc {
1371 use std::rc::Rc;
1372
1373 use timely::progress::{Timestamp, timestamp::Refines};
1374
1375 use super::{Negate, Enter, Leave, ResultsIn};
1376
1377 impl<C: Negate+Clone+Default> Negate for Rc<C> {
1378 fn negate(mut self) -> Self {
1379 std::mem::take(Rc::make_mut(&mut self)).negate().into()
1380 }
1381 }
1382
1383 impl<C: Enter<T1, T2>+Clone+Default, T1: Timestamp, T2: Refines<T1>> Enter<T1, T2> for Rc<C> {
1384 type InnerContainer = Rc<C::InnerContainer>;
1385 fn enter(mut self) -> Self::InnerContainer {
1386 std::mem::take(Rc::make_mut(&mut self)).enter().into()
1387 }
1388 }
1389
1390 impl<C: Leave<T1, T2>+Clone+Default, T1: Refines<T2>, T2: Timestamp> Leave<T1, T2> for Rc<C> {
1391 type OuterContainer = Rc<C::OuterContainer>;
1392 fn leave(mut self) -> Self::OuterContainer {
1393 std::mem::take(Rc::make_mut(&mut self)).leave().into()
1394 }
1395 }
1396
1397 impl<C: ResultsIn<TS>+Clone+Default, TS> ResultsIn<TS> for Rc<C> {
1398 fn results_in(mut self, step: &TS) -> Self {
1399 std::mem::take(Rc::make_mut(&mut self)).results_in(step).into()
1400 }
1401 }
1402 }
1403}