pub type Collection<G, D, R = isize> = Collection<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>;Expand description
An evolving collection of values of type D, backed by Rust Vec types as containers.
The Collection type is the core abstraction in differential dataflow programs. As you write your
differential dataflow computation, you write as if the collection is a static dataset to which you
apply functional transformations, creating new collections. Once your computation is written, you
are able to mutate the collection (by inserting and removing elements); differential dataflow will
propagate changes through your functional computation and report the corresponding changes to the
output collections.
Each vec collection has three generic parameters. The parameter G is for the scope in which the
collection exists; as you write more complicated programs you may wish to introduce nested scopes
(e.g. for iteration) and this parameter tracks the scope (for timely dataflow’s benefit). The D
parameter is the type of data in your collection, for example String, or (u32, Vec<Option<()>>).
The R parameter represents the types of changes that the data undergo, and is most commonly (and
defaults to) isize, representing changes to the occurrence count of each record.
This type definition instantiates the Collection type with a Vec<(D, G::Timestamp, R)>.
Aliased Type§
pub struct Collection<G, D, R = isize> {
pub inner: Stream<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>,
}Fields§
§inner: Stream<G, Vec<(D, <G as ScopeParent>::Timestamp, R)>>The underlying timely dataflow stream.
This field is exposed to support direct timely dataflow manipulation when required, but it is not intended to be the idiomatic way to work with the collection.
The timestamp in the data is required to always be at least the timestamp of the data, in the timely-dataflow sense. If this invariant is not upheld, differential operators may behave unexpectedly.
Implementations§
Source§impl<G, D, R, T, TOuter> VecCollection<G, D, R>
impl<G, D, R, T, TOuter> VecCollection<G, D, R>
Sourcepub fn enter_dynamic(self, _level: usize) -> Self
pub fn enter_dynamic(self, _level: usize) -> Self
Enters a dynamically created scope which has level timestamp coordinates.
Sourcepub fn leave_dynamic(self, level: usize) -> Self
pub fn leave_dynamic(self, level: usize) -> Self
Leaves a dynamically created scope which has level timestamp coordinates.
Source§impl<G: Scope, D: Clone + 'static, R: Clone + 'static> Collection<G, D, R>
impl<G: Scope, D: Clone + 'static, R: Clone + 'static> Collection<G, D, R>
Sourcepub fn map<D2, L>(self, logic: L) -> Collection<G, D2, R>
pub fn map<D2, L>(self, logic: L) -> Collection<G, D2, R>
Creates a new collection by applying the supplied function to each input element.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map(|x| x * 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});Sourcepub fn map_in_place<L>(self, logic: L) -> Collection<G, D, R>
pub fn map_in_place<L>(self, logic: L) -> Collection<G, D, R>
Creates a new collection by applying the supplied function to each input element.
Although the name suggests in-place mutation, this function does not change the source collection,
but rather re-uses the underlying allocations in its implementation. The method is semantically
equivalent to map, but can be more efficient.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});Sourcepub fn flat_map<I, L>(self, logic: L) -> Collection<G, I::Item, R>
pub fn flat_map<I, L>(self, logic: L) -> Collection<G, I::Item, R>
Creates a new collection by applying the supplied function to each input element and accumulating the results.
This method extracts an iterator from each input element, and extracts the full contents of the iterator. Be warned that if the iterators produce substantial amounts of data, they are currently fully drained before attempting to consolidate the results.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.flat_map(|x| 0 .. x);
});Sourcepub fn filter<L>(self, logic: L) -> Collection<G, D, R>
pub fn filter<L>(self, logic: L) -> Collection<G, D, R>
Creates a new collection containing those input records satisfying the supplied predicate.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map(|x| x * 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});Sourcepub fn explode<D2, R2, I, L>(
self,
logic: L,
) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
pub fn explode<D2, R2, I, L>( self, logic: L, ) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
Replaces each record with another, with a new difference type.
This method is most commonly used to take records containing aggregatable data (e.g. numbers to be summed) and move the data into the difference component. This will allow differential dataflow to update in-place.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let nums = scope.new_collection_from(0 .. 10).1;
let x1 = nums.clone().flat_map(|x| 0 .. x);
let x2 = nums.map(|x| (x, 9 - x))
.explode(|(x,y)| Some((x,y)));
x1.assert_eq(x2);
});Sourcepub fn join_function<D2, R2, I, L>(
self,
logic: L,
) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
pub fn join_function<D2, R2, I, L>( self, logic: L, ) -> Collection<G, D2, <R2 as Multiply<R>>::Output>
Joins each record against a collection defined by the function logic.
This method performs what is essentially a join with the collection of records (x, logic(x)).
Rather than materialize this second relation, logic is applied to each record and the appropriate
modifications made to the results, namely joining timestamps and multiplying differences.
#Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
// creates `x` copies of `2*x` from time `3*x` until `4*x`,
// for x from 0 through 9.
scope.new_collection_from(0 .. 10isize).1
.join_function(|x|
// data time diff
vec![(2*x, (3*x) as u64, x),
(2*x, (4*x) as u64, -x)]
);
});Sourcepub fn enter_at<'a, T, F>(
self,
child: &Iterative<'a, G, T>,
initial: F,
) -> Collection<Iterative<'a, G, T>, D, R>
pub fn enter_at<'a, T, F>( self, child: &Iterative<'a, G, T>, initial: F, ) -> Collection<Iterative<'a, G, T>, D, R>
Brings a Collection into a nested scope, at varying times.
The initial function indicates the time at which each element of the Collection should appear.
§Examples
use timely::dataflow::Scope;
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let result = scope.iterative::<u64,_,_>(|child| {
data.clone()
.enter_at(child, |x| *x)
.leave()
});
data.assert_eq(result);
});Sourcepub fn delay<F>(self, func: F) -> Collection<G, D, R>
pub fn delay<F>(self, func: F) -> Collection<G, D, R>
Delays each difference by a supplied function.
It is assumed that func only advances timestamps; this is not verified, and things may go horribly
wrong if that assumption is incorrect. It is also critical that func be monotonic: if two times are
ordered, they should have the same order or compare equal once func is applied to them (this
is because we advance the timely capability with the same logic, and it must remain less_equal
to all of the data timestamps).
Sourcepub fn inspect<F>(self, func: F) -> Collection<G, D, R>
pub fn inspect<F>(self, func: F) -> Collection<G, D, R>
Applies a supplied function to each update.
This method is most commonly used to report information back to the user, often for debugging purposes. Any function can be used here, but be warned that the incremental nature of differential dataflow does not guarantee that it will be called as many times as you might expect.
The (data, time, diff) triples indicate a change diff to the frequency of data which takes effect
at the logical time time. When times are totally ordered (for example, usize), these updates reflect
the changes along the sequence of collections. For partially ordered times, the mathematics are more
interesting and less intuitive, unfortunately.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.inspect(|x| println!("error: {:?}", x));
});Sourcepub fn inspect_batch<F>(self, func: F) -> Collection<G, D, R>
pub fn inspect_batch<F>(self, func: F) -> Collection<G, D, R>
Applies a supplied function to each batch of updates.
This method is analogous to inspect, but operates on batches and reveals the timestamp of the
timely dataflow capability associated with the batch of updates. The observed batching depends
on how the system executes, and may vary run to run.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map_in_place(|x| *x *= 2)
.filter(|x| x % 2 == 1)
.inspect_batch(|t,xs| println!("errors @ {:?}: {:?}", t, xs));
});Sourcepub fn assert_empty(self)where
D: ExchangeData + Hashable,
R: ExchangeData + Hashable + Semigroup,
G::Timestamp: Lattice + Ord,
pub fn assert_empty(self)where
D: ExchangeData + Hashable,
R: ExchangeData + Hashable + Semigroup,
G::Timestamp: Lattice + Ord,
Assert if the collection is ever non-empty.
Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation is not run, or not run to completion, there may be un-exercised times at which the collection could be non-empty. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should indicate that this assertion never found cause to complain.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
scope.new_collection_from(1 .. 10).1
.map(|x| x * 2)
.filter(|x| x % 2 == 1)
.assert_empty();
});Source§impl<G: Scope<Timestamp: Clone + 'static>, D: Clone + 'static, R: Abelian + 'static> Collection<G, D, R>
Methods requiring an Abelian difference, to support negation.
impl<G: Scope<Timestamp: Clone + 'static>, D: Clone + 'static, R: Abelian + 'static> Collection<G, D, R>
Methods requiring an Abelian difference, to support negation.
Sourcepub fn assert_eq(self, other: Self)
pub fn assert_eq(self, other: Self)
Assert if the collections are ever different.
Because this is a dataflow fragment, the test is only applied as the computation is run. If the computation is not run, or not run to completion, there may be un-exercised times at which the collections could vary. Typically, a timely dataflow computation runs to completion on drop, and so clean exit from a program should indicate that this assertion never found cause to complain.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let data = scope.new_collection_from(1 .. 10).1;
let odds = data.clone().filter(|x| x % 2 == 1);
let evens = data.clone().filter(|x| x % 2 == 0);
odds.concat(evens)
.assert_eq(data);
});Source§impl<G, K, V, R> Collection<G, (K, V), R>where
G: Scope<Timestamp: Lattice + Ord>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
impl<G, K, V, R> Collection<G, (K, V), R>where
G: Scope<Timestamp: Lattice + Ord>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
Sourcepub fn reduce<L, V2: Data, R2: Ord + Abelian + 'static>(
self,
logic: L,
) -> Collection<G, (K, V2), R2>
pub fn reduce<L, V2: Data, R2: Ord + Abelian + 'static>( self, logic: L, ) -> Collection<G, (K, V2), R2>
Applies a reduction function on records grouped by key.
Input data must be structured as (key, val) pairs.
The user-supplied reduction function takes as arguments
- a reference to the key,
- a reference to the slice of values and their accumulated updates,
- a mutuable reference to a vector to populate with output values and accumulated updates.
The user logic is only invoked for non-empty input collections, and it is safe to assume that the
slice of input values is non-empty. The values are presented in sorted order, as defined by their
Ord implementations.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
// report the smallest value for each group
scope.new_collection_from(1 .. 10).1
.map(|x| (x / 3, x))
.reduce(|_key, input, output| {
output.push((*input[0].0, 1))
});
});Sourcepub fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>(
self,
name: &str,
logic: L,
) -> Collection<G, (K, V2), R2>
pub fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>( self, name: &str, logic: L, ) -> Collection<G, (K, V2), R2>
As reduce with the ability to name the operator.
Sourcepub fn reduce_abelian<L, Bu, T2>(
self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_abelian<L, Bu, T2>( self, name: &str, logic: L, ) -> Arranged<G, TraceAgent<T2>>
Applies reduce to arranged data, and returns an arrangement of output data.
This method is used by the more ergonomic reduce, distinct, and count methods, although
it can be very useful if one needs to manually attach and re-use existing arranged collections.
§Examples
use differential_dataflow::input::Input;
use differential_dataflow::trace::Trace;
use differential_dataflow::trace::implementations::{ValBuilder, ValSpine};
::timely::example(|scope| {
let trace =
scope.new_collection_from(1 .. 10u32).1
.map(|x| (x, x))
.reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
"Example",
move |_key, src, dst| dst.push((*src[0].0, 1))
)
.trace;
});Sourcepub fn reduce_core<L, Bu, T2>(
self,
name: &str,
logic: L,
) -> Arranged<G, TraceAgent<T2>>
pub fn reduce_core<L, Bu, T2>( self, name: &str, logic: L, ) -> Arranged<G, TraceAgent<T2>>
Solves for output updates when presented with inputs and would-be outputs.
Unlike reduce_arranged, this method may be called with an empty input,
and it may not be safe to index into the first element.
At least one of the two collections will be non-empty.
Source§impl<G, K, R1> Collection<G, K, R1>
impl<G, K, R1> Collection<G, K, R1>
Sourcepub fn distinct(self) -> Collection<G, K, isize>
pub fn distinct(self) -> Collection<G, K, isize>
Reduces the collection to one occurrence of each distinct element.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
// report at most one of each key.
scope.new_collection_from(1 .. 10).1
.map(|x| x / 3)
.distinct();
});Sourcepub fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>(
self,
) -> Collection<G, K, R2>
pub fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>( self, ) -> Collection<G, K, R2>
Distinct for general integer differences.
This method allows distinct to produce collections whose difference
type is something other than an isize integer, for example perhaps an
i32.
Sourcepub fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
self,
thresh: F,
) -> Collection<G, K, R2>
pub fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>( self, thresh: F, ) -> Collection<G, K, R2>
Transforms the multiplicity of records.
The threshold function is obliged to map R1::zero to R2::zero, or at
least the computation may behave as if it does. Otherwise, the transformation
can be nearly arbitrary: the code does not assume any properties of threshold.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
// report at most one of each key.
scope.new_collection_from(1 .. 10).1
.map(|x| x / 3)
.threshold(|_,c| c % 2);
});Sourcepub fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
self,
name: &str,
thresh: F,
) -> Collection<G, K, R2>
pub fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>( self, name: &str, thresh: F, ) -> Collection<G, K, R2>
A threshold with the ability to name the operator.
Source§impl<G, K, R> Collection<G, K, R>
impl<G, K, R> Collection<G, K, R>
Sourcepub fn count(self) -> Collection<G, (K, R), isize>
pub fn count(self) -> Collection<G, (K, R), isize>
Counts the number of occurrences of each element.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
// report the number of occurrences of each key
scope.new_collection_from(1 .. 10).1
.map(|x| x / 3)
.count();
});Sourcepub fn count_core<R2: Ord + Abelian + From<i8> + 'static>(
self,
) -> Collection<G, (K, R), R2>
pub fn count_core<R2: Ord + Abelian + From<i8> + 'static>( self, ) -> Collection<G, (K, R), R2>
Count for general integer differences.
This method allows count to produce collections whose difference
type is something other than an isize integer, for example perhaps an
i32.
Source§impl<G, D, R> Collection<G, D, R>where
G: Scope<Timestamp: Clone + 'static + Lattice>,
D: ExchangeData + Hashable,
R: ExchangeData + Semigroup,
Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>where
G: Scope<Timestamp: Clone + 'static + Lattice>,
D: ExchangeData + Hashable,
R: ExchangeData + Semigroup,
Methods which require data be arrangeable.
Sourcepub fn consolidate(self) -> Self
pub fn consolidate(self) -> Self
Aggregates the weights of equal records into at most one record.
This method uses the type D’s hashed() method to partition the data. The data are
accumulated in place, each held back until their timestamp has completed.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(1 .. 10u32).1;
x.clone()
.negate()
.concat(x)
.consolidate() // <-- ensures cancellation occurs
.assert_empty();
});Sourcepub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
pub fn consolidate_named<Ba, Bu, Tr, F>(self, name: &str, reify: F) -> Self
As consolidate but with the ability to name the operator, specify the trace type,
and provide the function reify to produce owned keys and values..
Sourcepub fn consolidate_stream(self) -> Self
pub fn consolidate_stream(self) -> Self
Aggregates the weights of equal records.
Unlike consolidate, this method does not exchange data and does not
ensure that at most one copy of each (data, time) pair exists in the
results. Instead, it acts on each batch of data and collapses equivalent
(data, time) pairs found therein, suppressing any that accumulate to
zero.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(1 .. 10u32).1;
// nothing to assert, as no particular guarantees.
x.clone()
.negate()
.concat(x)
.consolidate_stream();
});Source§impl<G, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup> Collection<G, (K, V), R>
impl<G, K: ExchangeData + Hashable, V: ExchangeData, R: ExchangeData + Semigroup> Collection<G, (K, V), R>
Sourcepub fn arrange_by_key(
self,
) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
pub fn arrange_by_key( self, ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
Arranges a collection of (Key, Val) records by Key.
This operator arranges a stream of values into a shared trace, whose contents it maintains. This trace is current for all times completed by the output stream, which can be used to safely identify the stable times and values in the trace.
Sourcepub fn arrange_by_key_named(
self,
name: &str,
) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
pub fn arrange_by_key_named( self, name: &str, ) -> Arranged<G, TraceAgent<ValSpine<K, V, G::Timestamp, R>>>
As arrange_by_key but with the ability to name the arrangement.
Source§impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Collection<G, K, R>
impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Collection<G, K, R>
Sourcepub fn arrange_by_self(
self,
) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
pub fn arrange_by_self( self, ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
Arranges a collection of Key records by Key.
This operator arranges a collection of records into a shared trace, whose contents it maintains. This trace is current for all times complete in the output stream, which can be used to safely identify the stable times and values in the trace.
Sourcepub fn arrange_by_self_named(
self,
name: &str,
) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
pub fn arrange_by_self_named( self, name: &str, ) -> Arranged<G, TraceAgent<KeySpine<K, G::Timestamp, R>>>
As arrange_by_self but with the ability to name the arrangement.
Source§impl<G, K, V, R> Collection<G, (K, V), R>where
G: Scope<Timestamp: Lattice + Ord>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
impl<G, K, V, R> Collection<G, (K, V), R>where
G: Scope<Timestamp: Lattice + Ord>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
Sourcepub fn join<V2, R2>(
self,
other: Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2, Output: Semigroup + 'static>,
pub fn join<V2, R2>(
self,
other: Collection<G, (K, V2), R2>,
) -> Collection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>where
K: ExchangeData,
V2: ExchangeData,
R2: ExchangeData + Semigroup,
R: Multiply<R2, Output: Semigroup + 'static>,
Matches pairs (key,val1) and (key,val2) based on key and yields pairs (key, (val1, val2)).
The join_map method may be more convenient for non-trivial processing pipelines.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
x.join(y)
.assert_eq(z);
});Sourcepub fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
self,
other: Collection<G, (K, V2), R2>,
logic: L,
) -> Collection<G, D, <R as Multiply<R2>>::Output>
pub fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>( self, other: Collection<G, (K, V2), R2>, logic: L, ) -> Collection<G, D, <R as Multiply<R2>>::Output>
Matches pairs (key,val1) and (key,val2) based on key and then applies a function.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
x.join_map(y, |_key, &a, &b| (a,b))
.assert_eq(z);
});Sourcepub fn semijoin<R2: ExchangeData + Semigroup>(
self,
other: Collection<G, K, R2>,
) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
pub fn semijoin<R2: ExchangeData + Semigroup>( self, other: Collection<G, K, R2>, ) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
Matches pairs (key, val) and key based on key, producing the former with frequencies multiplied.
When the second collection contains frequencies that are either zero or one this is the more traditional relational semijoin. When the second collection may contain multiplicities, this operation may scale up the counts of the records in the first input.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
let y = scope.new_collection_from(vec![0, 2]).1;
let z = scope.new_collection_from(vec![(0, 1)]).1;
x.semijoin(y)
.assert_eq(z);
});Sourcepub fn antijoin<R2: ExchangeData + Semigroup>(
self,
other: Collection<G, K, R2>,
) -> Collection<G, (K, V), R>
pub fn antijoin<R2: ExchangeData + Semigroup>( self, other: Collection<G, K, R2>, ) -> Collection<G, (K, V), R>
Subtracts the semijoin with other from self.
In the case that other has multiplicities zero or one this results
in a relational antijoin, in which we discard input records whose key
is present in other. If the multiplicities could be other than zero
or one, the semantic interpretation of this operator is less clear.
In almost all cases, you should ensure that other has multiplicities
that are zero or one, perhaps by using the distinct operator.
§Examples
use differential_dataflow::input::Input;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
let y = scope.new_collection_from(vec![0, 2]).1;
let z = scope.new_collection_from(vec![(1, 3)]).1;
x.antijoin(y)
.assert_eq(z);
});Sourcepub fn join_core<Tr2, I, L>(
self,
stream2: Arranged<G, Tr2>,
result: L,
) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
pub fn join_core<Tr2, I, L>( self, stream2: Arranged<G, Tr2>, result: L, ) -> Collection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
Joins two arranged collections with the same key type.
Each matching pair of records (key, val1) and (key, val2) are subjected to the result function,
which produces something implementing IntoIterator, where the output collection will have an entry for
every value returned by the iterator.
This trait is implemented for arrangements (Arranged<G, T>) rather than collections. The Join trait
contains the implementations for collections.
§Examples
use differential_dataflow::input::Input;
use differential_dataflow::trace::Trace;
::timely::example(|scope| {
let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
.arrange_by_key();
let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
.arrange_by_key();
let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
x.join_core(y, |_key, &a, &b| Some((a, b)))
.assert_eq(z);
});Trait Implementations§
Source§impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Arrange<G, Vec<((K, ()), <G as ScopeParent>::Timestamp, R)>> for Collection<G, K, R>
impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> Arrange<G, Vec<((K, ()), <G as ScopeParent>::Timestamp, R)>> for Collection<G, K, R>
Source§fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
Source§impl<G, K, V, R> Arrange<G, Vec<((K, V), <G as ScopeParent>::Timestamp, R)>> for Collection<G, (K, V), R>where
G: Scope<Timestamp: Lattice>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
impl<G, K, V, R> Arrange<G, Vec<((K, V), <G as ScopeParent>::Timestamp, R)>> for Collection<G, (K, V), R>where
G: Scope<Timestamp: Lattice>,
K: ExchangeData + Hashable,
V: ExchangeData,
R: ExchangeData + Semigroup,
Source§fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
fn arrange_named<Ba, Bu, Tr>(self, name: &str) -> Arranged<G, TraceAgent<Tr>>
Source§impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> CountTotal<G, K, R> for VecCollection<G, K, R>
impl<G, K: ExchangeData + Hashable, R: ExchangeData + Semigroup> CountTotal<G, K, R> for VecCollection<G, K, R>
Source§fn count_total_core<R2: Semigroup + From<i8> + 'static>(
self,
) -> VecCollection<G, (K, R), R2>
fn count_total_core<R2: Semigroup + From<i8> + 'static>( self, ) -> VecCollection<G, (K, R), R2>
Source§fn count_total(self) -> VecCollection<G, (K, R), isize>
fn count_total(self) -> VecCollection<G, (K, R), isize>
Source§impl<G, D, R> Identifiers<G, D, R> for VecCollection<G, D, R>
impl<G, D, R> Identifiers<G, D, R> for VecCollection<G, D, R>
Source§fn identifiers(self) -> VecCollection<G, (D, u64), R>
fn identifiers(self) -> VecCollection<G, (D, u64), R>
Source§impl<G: Scope<Timestamp: Lattice>, D: Ord + Data + Debug, R: Abelian + 'static> Iterate<G, D, R> for VecCollection<G, D, R>
impl<G: Scope<Timestamp: Lattice>, D: Ord + Data + Debug, R: Abelian + 'static> Iterate<G, D, R> for VecCollection<G, D, R>
Source§fn iterate<F>(self, logic: F) -> VecCollection<G, D, R>where
for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>) -> VecCollection<Iterative<'a, G, u64>, D, R>,
fn iterate<F>(self, logic: F) -> VecCollection<G, D, R>where
for<'a> F: FnOnce(Iterative<'a, G, u64>, VecCollection<Iterative<'a, G, u64>, D, R>) -> VecCollection<Iterative<'a, G, u64>, D, R>,
logic to the source collection until convergence. Read moreSource§impl<G, K, D> PrefixSum<G, K, D> for VecCollection<G, ((usize, K), D)>
impl<G, K, D> PrefixSum<G, K, D> for VecCollection<G, ((usize, K), D)>
Source§fn prefix_sum<F>(self, zero: D, combine: F) -> Self
fn prefix_sum<F>(self, zero: D, combine: F) -> Self
Source§fn prefix_sum_at<F>(
self,
locations: VecCollection<G, (usize, K)>,
zero: D,
combine: F,
) -> Self
fn prefix_sum_at<F>( self, locations: VecCollection<G, (usize, K)>, zero: D, combine: F, ) -> Self
location.