Trait differential_dataflow::operators::consolidate::ConsolidateExt [] [src]

pub trait ConsolidateExt<D: Data> {
    fn consolidate(&self) -> Self;
    fn consolidate_by<U: Unsigned, F: Fn(&D) -> U + 'static>(&self, part: F) -> Self;
}

An extension method for consolidating weighted streams.

Required Methods

fn consolidate(&self) -> Self

Aggregates the weights of equal records into at most one record.

This method uses the type D's hashed() method to partition the data.

Examples

In the following fragment, result contains only (1, 3):

let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope);
let collection = Collection::new(stream);
let result = collection.consolidate();

fn consolidate_by<U: Unsigned, F: Fn(&D) -> U + 'static>(&self, part: F) -> Self

Aggregates the weights of equal records into at most one record, partitions the data using the supplied partition function.

Note that consolidate_by attempts to aggregate weights as it goes, to ensure that it does not consume more memory than is required of its collection. It does among blocks of records with the same part value, so if you just set all part values to the same value, it may not do a great job because you'll have lots of blocks with distinct values. Just, bear that in mind if you want to be clever.

Examples

In the following fragment, result contains only (1, 3):

let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope);
let collection = Collection::new(stream);
let result = collection.consolidate_by(|&x| x);

Implementors