[][src]Trait differential_dataflow::operators::consolidate::ConsolidateStream

pub trait ConsolidateStream<D: ExchangeData + Hashable> {
    fn consolidate_stream(&self) -> Self;
}

An extension method for consolidating weighted streams.

Required methods

fn consolidate_stream(&self) -> Self

Aggregates the weights of equal records.

Unlike consolidate, this method does not exchange data and does not ensure that at most one copy of each (data, time) pair exists in the results. Instead, it acts on each batch of data and collapses equivalent (data, time) pairs found therein, suppressing any that accumulate to zero.

Examples

extern crate timely;
extern crate differential_dataflow;

use differential_dataflow::input::Input;
use differential_dataflow::operators::consolidate::ConsolidateStream;

fn main() {
    ::timely::example(|scope| {

        let x = scope.new_collection_from(1 .. 10u32).1;

        // nothing to assert, as no particular guarantees.
        x.negate()
         .concat(&x)
         .consolidate_stream();
    });
}
Loading content...

Implementors

impl<G: Scope, D, R> ConsolidateStream<D> for Collection<G, D, R> where
    D: ExchangeData + Hashable,
    R: ExchangeData + Semigroup,
    G::Timestamp: Lattice + Ord
[src]

Loading content...