Trait timely::dataflow::operators::aggregation::aggregate::Aggregate [] [src]

pub trait Aggregate<S: Scope, K: Data + Hash, V: Data> {
    fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, G: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(&self, fold: F, emit: G, hash: H) -> Stream<S, R> where S::Timestamp: Hash + Eq;
}

Generic intra-timestamp aggregation

Extension method supporting aggregation of keyed data within timestamp. For inter-timestamp aggregation, consider StateMachine.

Required Methods

fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, G: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(&self, fold: F, emit: G, hash: H) -> Stream<S, R> where S::Timestamp: Hash + Eq

Aggregates data of the form (key, val), using user-supplied logic.

The aggregate method is implemented for streams of (K, V) data, and takes functions fold, emit, and hash; used to combine new V data with existing D state, to produce R output from D state, and to route K keys, respectively.

Aggregation happens within each time, and results are produced once the time is complete.

Examples

use timely::dataflow::operators::{ToStream, Map, Inspect};
use timely::dataflow::operators::aggregation::Aggregate;

timely::example(|scope| {

    (0..10).to_stream(scope)
           .map(|x| (x % 2, x))
           .aggregate(
               |_key, val, agg| { *agg += val; }, 
               |key, agg: i32| (key, agg), 
               |key| *key as u64
           )
           .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25)));
});

By changing the type of the aggregate value, one can accumulate into different types. Here we accumulate the data into a Vec<i32> and report its length (which we could obviously do more efficiently; imagine we were doing a hash instead).

use timely::dataflow::operators::{ToStream, Map, Inspect};
use timely::dataflow::operators::aggregation::Aggregate;

timely::example(|scope| {

    (0..10).to_stream(scope)
           .map(|x| (x % 2, x))
           .aggregate::<_,Vec<i32>,_,_,_>(
               |_key, val, agg| { agg.push(val); }, 
               |key, agg| (key, agg.len()), 
               |key| *key as u64
           )
           .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5)));
});

Implementors