1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
//! General purpose intra-timestamp aggregation
use std::hash::Hash;
use std::collections::HashMap;

use crate::{Data, ExchangeData};
use crate::dataflow::{Stream, Scope};
use crate::dataflow::operators::generic::operator::Operator;
use crate::dataflow::channels::pact::Exchange;

/// Generic intra-timestamp aggregation
///
/// Extension method supporting aggregation of keyed data within timestamp.
/// For inter-timestamp aggregation, consider `StateMachine`.
pub trait Aggregate<S: Scope, K: ExchangeData+Hash, V: ExchangeData> {
    /// Aggregates data of the form `(key, val)`, using user-supplied logic.
    ///
    /// The `aggregate` method is implemented for streams of `(K, V)` data,
    /// and takes functions `fold`, `emit`, and `hash`; used to combine new `V`
    /// data with existing `D` state, to produce `R` output from `D` state, and
    /// to route `K` keys, respectively.
    ///
    /// Aggregation happens within each time, and results are produced once the
    /// time is complete.
    ///
    /// # Examples
    /// ```
    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
    /// use timely::dataflow::operators::aggregation::Aggregate;
    ///
    /// timely::example(|scope| {
    ///
    ///     (0..10).to_stream(scope)
    ///         .map(|x| (x % 2, x))
    ///         .aggregate(
    ///             |_key, val, agg| { *agg += val; },
    ///             |key, agg: i32| (key, agg),
    ///             |key| *key as u64
    ///         )
    ///         .inspect(|x| assert!(*x == (0, 20) || *x == (1, 25)));
    /// });
    /// ```
    ///
    /// By changing the type of the aggregate value, one can accumulate into different types.
    /// Here we accumulate the data into a `Vec<i32>` and report its length (which we could
    /// obviously do more efficiently; imagine we were doing a hash instead).
    ///
    /// ```
    /// use timely::dataflow::operators::{ToStream, Map, Inspect};
    /// use timely::dataflow::operators::aggregation::Aggregate;
    ///
    /// timely::example(|scope| {
    ///
    ///     (0..10).to_stream(scope)
    ///         .map(|x| (x % 2, x))
    ///         .aggregate::<_,Vec<i32>,_,_,_>(
    ///             |_key, val, agg| { agg.push(val); },
    ///             |key, agg| (key, agg.len()),
    ///             |key| *key as u64
    ///         )
    ///         .inspect(|x| assert!(*x == (0, 5) || *x == (1, 5)));
    /// });
    /// ```
    fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
        &self,
        fold: F,
        emit: E,
        hash: H) -> Stream<S, R> where S::Timestamp: Eq;
}

impl<S: Scope, K: ExchangeData+Hash+Eq, V: ExchangeData> Aggregate<S, K, V> for Stream<S, (K, V)> {

    fn aggregate<R: Data, D: Default+'static, F: Fn(&K, V, &mut D)+'static, E: Fn(K, D)->R+'static, H: Fn(&K)->u64+'static>(
        &self,
        fold: F,
        emit: E,
        hash: H) -> Stream<S, R> where S::Timestamp: Eq {

        let mut aggregates = HashMap::new();
        let mut vector = Vec::new();
        self.unary_notify(Exchange::new(move |&(ref k, _)| hash(k)), "Aggregate", vec![], move |input, output, notificator| {

            // read each input, fold into aggregates
            input.for_each(|time, data| {
                data.swap(&mut vector);
                let agg_time = aggregates.entry(time.time().clone()).or_insert_with(HashMap::new);
                for (key, val) in vector.drain(..) {
                    let agg = agg_time.entry(key.clone()).or_insert_with(Default::default);
                    fold(&key, val, agg);
                }
                notificator.notify_at(time.retain());
            });

            // pop completed aggregates, send along whatever
            notificator.for_each(|time,_,_| {
                if let Some(aggs) = aggregates.remove(time.time()) {
                    let mut session = output.session(&time);
                    for (key, agg) in aggs {
                        session.give(emit(key, agg));
                    }
                }
            });
        })

    }
}