1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
//! General purpose intra-timestamp aggregation use std::hash::Hash; use std::collections::HashMap; use crate::{Data, ExchangeData}; use crate::dataflow::{Stream, Scope}; use crate::dataflow::operators::generic::operator::Operator; use crate::dataflow::channels::pact::Exchange; /// Generic intra-timestamp aggregation /// /// Extension method supporting aggregation of keyed data within timestamp. /// For inter-timestamp aggregation, consider `StateMachine`. pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> { /// Aggregates data of the form `(key, val)`, using user-supplied logic. /// /// The `aggregate` method is implemented for streams of `(K, V)` data, /// and takes functions `fold`, `emit`, and `hash`; used to combine new `V` /// data with existing `D` state, to produce `R` output from `D` state, and /// to route `K` keys, respectively. /// /// Aggregation happens within each time, and results are produced once the /// time is complete. /// /// # Examples /// ``` /// use timely::dataflow::operators::{ToStream, Map, Inspect}; /// use timely::dataflow::operators::aggregation::Aggregate; /// /// timely::example(|scope| { /// /// (0..10).to_stream(scope) /// .map(|x| (x % 2, x)) /// .aggregate( /// |_key, val, agg| { *agg += val; }, /// |key, agg: i32| (key, agg), /// |key| *key as u64 /// ) /// .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25))); /// }); /// ``` /// /// By changing the type of the aggregate value, one can accumulate into different types. /// Here we accumulate the data into a `Vec<i32>` and report its length (which we could /// obviously do more efficiently; imagine we were doing a hash instead). /// /// ``` /// use timely::dataflow::operators::{ToStream, Map, Inspect}; /// use timely::dataflow::operators::aggregation::Aggregate; /// /// timely::example(|scope| { /// /// (0..10).to_stream(scope) /// .map(|x| (x % 2, x)) /// .aggregate::<_,Vec<i32>,_,_,_>( /// |_key, val, agg| { agg.push(val); }, /// |key, agg| (key, agg.len()), /// |key| *key as u64 /// ) /// .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5))); /// }); /// ``` fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>( &self, fold: F, emit: E, hash: H) -> Stream<S, R> where S::Timestamp: Eq; } impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> { fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>( &self, fold: F, emit: E, hash: H) -> Stream<S, R> where S::Timestamp: Eq { let mut aggregates = HashMap::new(); let mut vector = Vec::new(); self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| { // read each input, fold into aggregates input.for_each(|time, data| { data.swap(&mut vector); let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new); for (key, val) in vector.drain(..) { let agg = agg_time.entry(key.clone()).or_insert_with(Default::default); fold(&key, val, agg); } notificator.notify_at(time.retain()); }); // pop completed aggregates, send along whatever notificator.for_each(|time,_,_| { if let Some(aggs) = aggregates.remove(time.time()) { let mut session = output.session(&time); for (key, agg) in aggs { session.give(emit(key, agg)); } } }); }) } }