1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
//! Group records by a key, and apply a reduction function.
//!
//! The `group` operators act on data that can be viewed as pairs `(key, val)`. They group records
//! with the same key, and apply user supplied functions to the key and a list of values, which are
//! expected to populate a list of output values.
//!
//! Several variants of `group` exist which allow more precise control over how grouping is done.
//! For example, the `_by` suffixed variants take arbitrary data, but require a key-value selector
//! to be applied to each record. The `_u` suffixed variants use unsigned integers as keys, and
//! will use a dense array rather than a `HashMap` to store their keys.
//!
//! The list of values are presented as an iterator which internally merges sorted lists of values.
//! This ordering can be exploited in several cases to avoid computation when only the first few
//! elements are required.
//!
//! #Examples
//!
//! This example groups a stream of `(key,val)` pairs by `key`, and yields only the most frequently
//! occurring value for each key.
//!
//! ```ignore
//! stream.group(|key, vals, output| {
//!     let (mut max_val, mut max_wgt) = vals.next().unwrap();
//!     for (val, wgt) in vals {
//!         if wgt > max_wgt {
//!             max_wgt = wgt;
//!             max_val = val;
//!         }
//!     }
//!     output.push((max_val.clone(), max_wgt));
//! })
//! ```

use std::rc::Rc;
use std::cell::RefCell;
use std::default::Default;
use std::collections::HashMap;

use itertools::Itertools;
use linear_map::LinearMap;
use vec_map::VecMap;

use ::{Data, Collection, Delta};
use timely::dataflow::*;
use timely::dataflow::operators::Unary;
use timely::dataflow::channels::pact::Pipeline;
use timely_sort::Unsigned;

use collection::{LeastUpperBound, Lookup, Trace, BasicTrace, Offset};
use collection::trace::CollectionIterator;
use collection::basic::DifferenceIterator;
use collection::compact::Compact;

use iterators::coalesce::Coalesce;

use operators::arrange::{Arranged, ArrangeByKey};

/// Extension trait for the `group` differential dataflow method
pub trait Group<G: Scope, K: Data, V: Data> where G::Timestamp: LeastUpperBound {

    /// Groups records by their first field, and applies reduction logic to the associated values.
    ///
    /// It would be nice for this to be generic over possible arrangements of data, but it seems that Rust
    /// ICEs when I try this. I'm not exactly sure how one would specify the arrangement (the type is used
    /// in `logic`, by way of its associated iterator types, but not clearly enough to drive type inference),
    /// but we don't have that problem yet anyhow.
    ///
    /// In any case, it would be great if the same implementation could handle `(K,V)` pairs and `K` elements
    /// treated as `(K, ())` pairs.
    fn group<L, V2: Data>(&self, logic: L) -> Collection<G, (K,V2)>
        where L: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, Delta)>)+'static;
}

impl<G: Scope, K: Data+Default, V: Data+Default> Group<G, K, V> for Collection<G, (K,V)>
where G::Timestamp: LeastUpperBound
{
    fn group<L, V2: Data>(&self, logic: L) -> Collection<G, (K,V2)>
        where L: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, Delta)>)+'static {
            self.arrange_by_key(|k| k.hashed(), |_| HashMap::new())
                .group(|k| k.hashed(), |_| HashMap::new(), logic)
                .as_collection()
    }
}

/// Counts the number of occurrences of each element.
pub trait Count<G: Scope, K: Data> where G::Timestamp: LeastUpperBound {
    /// Counts the number of occurrences of each element.
    fn count(&self) -> Collection<G, (K,Delta)>;
}

impl<G: Scope, K: Data+Default> Count<G, K> for Collection<G, K> where G::Timestamp: LeastUpperBound {
    fn count(&self) -> Collection<G, (K,Delta)> {
        self.map(|k| (k,()))
            .group(|_k,s,t| t.push((s.next().unwrap().1, 1)))
    }
}

/// Extension trait for the `group_u` differential dataflow method.
pub trait GroupUnsigned<G: Scope, U: Unsigned+Data+Default, V: Data>
    where G::Timestamp: LeastUpperBound {
    /// Groups records by their first field, when this field implements `Unsigned`.
    ///
    /// This method uses a `Vec<Option<_>>` as its internal storage, allocating
    /// enough memory to directly index based on the first fields. This can be
    /// very useful when these fields are small integers, but it can be expensive
    /// if they are large and sparse.
    fn group_u<L, V2: Data>(&self, logic: L) -> Collection<G, (U, V2)>
        where L: Fn(&U, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static;
}

impl<G: Scope, U: Unsigned+Data+Default, V: Data> GroupUnsigned<G, U, V> for Collection<G, (U,V)>
where G::Timestamp: LeastUpperBound {
    fn group_u<L, V2: Data>(&self, logic: L) -> Collection<G, (U, V2)>
        where L: Fn(&U, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static {
            self.arrange_by_key(|k| k.as_u64(), |x| (VecMap::new(), x))
                .group(|k| k.as_u64(), |x| (VecMap::new(), x), logic)
                .as_collection()
        }
}


/// Counts the number of occurrences of each unsigned element.
pub trait CountUnsigned<G: Scope, U: Unsigned+Data+Default> where G::Timestamp: LeastUpperBound {
    /// Counts the number of occurrences of each unsigned element.
    fn count_u(&self) -> Collection<G, (U,Delta)>;
}

impl<G: Scope, U: Unsigned+Data+Default> CountUnsigned<G, U> for Collection<G, U> where G::Timestamp: LeastUpperBound {
    fn count_u(&self) -> Collection<G,(U,Delta)> {
        self.map(|k| (k,()))
            .group_u(|_k,s,t| t.push((s.next().unwrap().1, 1)))
    }
}

/// Extension trait for the `group` operator on `Arrange<_>` data.
pub trait GroupArranged<G: Scope, K: Data, V: Data> {
    /// Groups arranged data using a key hash function, a lookup generator, and user logic.
    ///
    /// This method is used by `group` and `group_u` as defined on `Collection`, and it can
    /// also be called directly by user code that wants access to the arranged output. This
    /// can be helpful when the resulting data are re-used immediately with the same keys.
    fn group<V2, U, KH, Look, LookG, Logic>(&self, key_h: KH, look: LookG, logic: Logic)
        -> Arranged<G, BasicTrace<K,G::Timestamp,V2,Look>>
    where
        G::Timestamp: LeastUpperBound,
        V2:    Data,
        U:     Unsigned+Default,
        KH:    Fn(&K)->U+'static,
        Look:  Lookup<K, Offset>+'static,
        LookG: Fn(u64)->Look,
        Logic: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static;
}

impl<G, K, V, L> GroupArranged<G, K, V> for Arranged<G, BasicTrace<K, G::Timestamp, V, L>>
where
    G: Scope,
    K: Data,
    V: Data,
    L: Lookup<K, Offset>+'static,
    G::Timestamp: LeastUpperBound {

    fn group<V2, U, KH, Look, LookG, Logic>(&self, key_h: KH, look: LookG, logic: Logic)
        -> Arranged<G, BasicTrace<K,G::Timestamp,V2,Look>>
    where
        V2:    Data,
        U:     Unsigned+Default,
        KH:    Fn(&K)->U+'static,
        Look:  Lookup<K, Offset>+'static,
        LookG: Fn(u64)->Look,
        Logic: Fn(&K, &mut CollectionIterator<DifferenceIterator<V>>, &mut Vec<(V2, i32)>)+'static {

        let peers = self.stream.scope().peers();
        let mut log_peers = 0;
        while (1 << (log_peers + 1)) <= peers {
            log_peers += 1;
        }

        let source = self.trace.clone();
        let result = Rc::new(RefCell::new(BasicTrace::new(look(log_peers))));
        let target = result.clone();

        // A map from times to received (key, val, wgt) triples.
        let mut inputs = LinearMap::new();

        // A map from times to a list of keys that need processing at that time.
        let mut to_do = LinearMap::new();

        // temporary storage for operator implementations to populate
        let mut buffer = vec![];

        // fabricate a data-parallel operator using the `unary_notify` pattern.
        let stream = self.stream.unary_notify(Pipeline, "GroupArranged", vec![], move |input, output, notificator| {

            // 1. read each input, and stash it in our staging area.
            // only stash the keys, because vals etc are in self.trace
            input.for_each(|time, data| {
                inputs.entry_or_insert(time.time(), || {
                    notificator.notify_at(time);
                    data.drain(..).next().unwrap().0
                });
            });

            // 2. go through each time of interest that has reached completion
            // times are interesting either because we received data, or because we conclude
            // in the processing of a time that a future time will be interesting.
            notificator.for_each(|capability, _count, notificator| {

                let time = capability.time();

                // 2a. If we received any keys, determine the interesting times for each.
                //     We then enqueue the keys at the corresponding time, for use later.
                if let Some(queue) = inputs.remove_key(&time) {

                    let source = source.borrow();
                    let mut stash = Vec::new();
                    for key in queue {
                        if source.get_difference(&key, &time).is_some() {

                            // determine times at which updates may occur.
                            stash.push(capability.time());
                            source.interesting_times(&key, &time, &mut stash);

                            for new_time in &stash {
                                to_do.entry_or_insert(new_time.clone(), || {
                                         notificator.notify_at(capability.delayed(new_time));
                                         Vec::new()
                                     })
                                     .push(key.clone());
                            }
                            stash.clear();
                        }
                    }
                }

                // 2b. Process any interesting keys at this time.
                if let Some(mut keys) = to_do.remove_key(&time) {

                    // We would like these keys in a particular order.
                    // We also want to de-duplicate them, in case there are dupes.
                    keys.sort_by(|x,y| (key_h(&x), x).cmp(&(key_h(&y), y)));
                    keys.dedup();

                    // accumulations for installation into result
                    let mut accumulation = Compact::new(0,0);

                    // borrow `source` to avoid constant re-borrowing.
                    let mut source_borrow = source.borrow_mut();

                    for key in keys {

                        // acquire an iterator over the collection at `time`.
                        let mut input = source_borrow.get_collection(&key, &time);

                        // if we have some input data, invoke logic to populate buffer.
                        if input.peek().is_some() { logic(&key, &mut input, &mut buffer); }

                        // sort the buffer, because we can't trust user code to do that.
                        buffer.sort_by(|x,y| x.0.cmp(&y.0));

                        // push differences in to Compact.
                        let mut compact = accumulation.session();

                        {
                            let mut borrow = target.borrow_mut();
                            let iter = borrow.get_collection(&key, &time)
                                             .map(|(v, w)| (v,-w))
                                             .merge_by(buffer.iter().map(|&(ref v, w)| (v, w)), |x,y| {
                                                 x.0 <= y.0
                                             });

                            for (val, wgt) in Coalesce::coalesce(iter) {
                                compact.push(val.clone(), wgt);
                            }
                        }

                        compact.done(key);
                        buffer.clear();
                    }

                    if accumulation.vals.len() > 0 {
                        output.session(&capability).give((
                            accumulation.keys.clone(), 
                            accumulation.cnts.clone(), 
                            accumulation.vals.clone()
                        ));
                        target.borrow_mut().set_difference(time, accumulation);
                    }
                }
            });
        });

        Arranged { stream: stream, trace: result }
    }
}