Trait timely::dataflow::operators::aggregation::aggregate::Aggregate
[−]
[src]
pub trait Aggregate<S: Scope, K: ExchangeData + Hash, V: ExchangeData> { fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, G: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R>
where
S::Timestamp: Hash + Eq; }
Generic intra-timestamp aggregation
Extension method supporting aggregation of keyed data within timestamp.
For inter-timestamp aggregation, consider StateMachine
.
Required Methods
fn aggregate<R: Data, D: Default + 'static, F: Fn(&K, V, &mut D) + 'static, G: Fn(K, D) -> R + 'static, H: Fn(&K) -> u64 + 'static>(
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
&self,
fold: F,
emit: G,
hash: H
) -> Stream<S, R> where
S::Timestamp: Hash + Eq,
Aggregates data of the form (key, val)
, using user-supplied logic.
The aggregate
method is implemented for streams of (K, V)
data,
and takes functions fold
, emit
, and hash
; used to combine new V
data with existing D
state, to produce R
output from D
state, and
to route K
keys, respectively.
Aggregation happens within each time, and results are produced once the time is complete.
Examples
use timely::dataflow::operators::{ToStream, Map, Inspect}; use timely::dataflow::operators::aggregation::Aggregate; timely::example(|scope| { (0..10).to_stream(scope) .map(|x| (x % 2, x)) .aggregate( |_key, val, agg| { *agg += val; }, |key, agg: i32| (key, agg), |key| *key as u64 ) .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25))); });
By changing the type of the aggregate value, one can accumulate into different types.
Here we accumulate the data into a Vec<i32>
and report its length (which we could
obviously do more efficiently; imagine we were doing a hash instead).
use timely::dataflow::operators::{ToStream, Map, Inspect}; use timely::dataflow::operators::aggregation::Aggregate; timely::example(|scope| { (0..10).to_stream(scope) .map(|x| (x % 2, x)) .aggregate::<_,Vec<i32>,_,_,_>( |_key, val, agg| { agg.push(val); }, |key, agg| (key, agg.len()), |key| *key as u64 ) .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5))); });
Implementors
impl<S: Scope, K: ExchangeData + Hash + Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)>