Trait differential_dataflow::operators::consolidate::ConsolidateExt
[−]
[src]
pub trait ConsolidateExt<D: Data> { fn consolidate(&self) -> Self; fn consolidate_by<U: Unsigned, F: Fn(&D) -> U + 'static>(&self, part: F) -> Self; }
An extension method for consolidating weighted streams.
Required Methods
fn consolidate(&self) -> Self
Aggregates the weights of equal records into at most one record.
This method uses the type D
's hashed()
method to partition the data.
Examples
In the following fragment, result
contains only (1, 3)
:
let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope); let collection = Collection::new(stream); let result = collection.consolidate();
fn consolidate_by<U: Unsigned, F: Fn(&D) -> U + 'static>(&self, part: F) -> Self
Aggregates the weights of equal records into at most one record, partitions the data using the supplied partition function.
Note that consolidate_by
attempts to aggregate weights as it goes, to ensure
that it does not consume more memory than is required of its collection. It does
among blocks of records with the same part
value, so if you just set all part
values to the same value, it may not do a great job because you'll have lots of
blocks with distinct values. Just, bear that in mind if you want to be clever.
Examples
In the following fragment, result
contains only (1, 3)
:
let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope); let collection = Collection::new(stream); let result = collection.consolidate_by(|&x| x);
Implementors
impl<G: Scope, D: Ord + Data + Debug> ConsolidateExt<D> for Collection<G, D>