1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
//! Aggregates the weights of equal records into at most one record.
//!
//! As differential dataflow streams are unordered and taken to be the accumulation of all records,
//! no semantic change happens via `consolidate`. However, there is a practical difference between
//! a collection that aggregates down to zero records, and one that actually has no records. The
//! underlying system can more clearly see that no work must be done in the later case, and we can
//! drop out of, e.g. iterative computations.
//!
//! #Examples
//!
//! This example performs a standard "word count", where each line of text is split into multiple
//! words, each word is converted to a word with count 1, and `consolidate` then accumulates the
//! counts of equivalent words.
//!
//! ```ignore
//! stream.flat_map(|line| line.split_whitespace())
//!       .map(|word| (word, 1))
//!       .consolidate();
//! ```

use std::rc::Rc;
use std::fmt::Debug;

use linear_map::LinearMap;

use timely::dataflow::*;
use timely::dataflow::operators::*;
use timely::dataflow::channels::pact::Exchange;
use timely_sort::{LSBRadixSorter, Unsigned};

use collection::Lookup;
use iterators::coalesce::Coalesce;

use ::{Collection, Data};

/// An extension method for consolidating weighted streams.
pub trait ConsolidateExt<D: Data> {
    /// Aggregates the weights of equal records into at most one record.
    ///
    /// This method uses the type `D`'s `hashed()` method to partition the data.
    /// 
    /// #Examples
    ///
    /// In the following fragment, `result` contains only `(1, 3)`:
    ///
    /// ```ignore
    /// let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope);
    /// let collection = Collection::new(stream);
    /// let result = collection.consolidate();
    /// ```
    fn consolidate(&self) -> Self;

    /// Aggregates the weights of equal records into at most one record, partitions the
    /// data using the supplied partition function.
    ///
    /// Note that `consolidate_by` attempts to aggregate weights as it goes, to ensure
    /// that it does not consume more memory than is required of its collection. It does
    /// among blocks of records with the same `part` value, so if you just set all part 
    /// values to the same value, it may not do a great job because you'll have lots of 
    /// blocks with distinct values. Just, bear that in mind if you want to be clever.
    ///
    /// #Examples
    ///
    /// In the following fragment, `result` contains only `(1, 3)`:
    ///
    /// ```ignore
    /// let stream = vec![(0,1),(1,1),(0,-1),(1,2)].to_stream(scope);
    /// let collection = Collection::new(stream);
    /// let result = collection.consolidate_by(|&x| x);
    /// ```    
    fn consolidate_by<U: Unsigned, F: Fn(&D)->U+'static>(&self, part: F) -> Self;
}

impl<G: Scope, D: Ord+Data+Debug> ConsolidateExt<D> for Collection<G, D> {
    fn consolidate(&self) -> Self {
       self.consolidate_by(|x| x.hashed())
    }

    fn consolidate_by<U: Unsigned, F: Fn(&D)->U+'static>(&self, part: F) -> Self {
        let mut inputs = LinearMap::new();    // LinearMap<G::Timestamp, Vec<(D, i32)>>
        let part1 = Rc::new(part);
        let part2 = part1.clone();

        let exch = Exchange::new(move |&(ref x,_)| (*part1)(x).as_u64());
        Collection::new(self.inner.unary_notify(exch, "Consolidate", vec![], move |input, output, notificator| {

            // Consolidation needs to address the issue that the amount of incoming data
            // may be too large to maintain in queues, but small enough once aggregated.
            // Although arbitrarily large queues *should* work at sequential disk io (?)
            // it seems like we should be smarter than that. We could overflow disk when
            // counting crazy 12-cliques or something, right?

            // So, we can't just pop data off and enqueue it, we need to do something like
            // sort and consolidate what we have, or something like that. We know how to 
            // sort and consolidate (see below), but who knows how we should be doing merges
            // and weird stuff like that... Probably power-of-two merges or somesuch...

            // If we thought we were unlikely to have collisions, we could just sort each 
            // of the hunks we get out of the radix sorter and coalesce them. This could be
            // arbitrarily ineffective if we have lots of collisions though (consolidating 
            // by a key of a (key,val) pair, for some reason). Let's try it for now.

            input.for_each(|index, data| {

                // make large to turn off compaction.
                let default_threshold = usize::max_value();
                // let default_threshold = 256 << 10;

                // an entry stores a sorter (the data), a current count, and a compaction threshold.
                let entry = inputs.entry_or_insert(index.time(), || (LSBRadixSorter::new(), 0, default_threshold));
                let (ref mut sorter, ref mut count, ref mut thresh) = *entry;

                *count += data.len();
                sorter.extend(data.drain(..), &|x| (*part2)(&x.0));

                if count > thresh {

                    *count = 0; 
                    *thresh = 0;

                    // pull out blocks sorted by the hash; coalesce each.
                    let finished = sorter.finish(&|x| (*part2)(&x.0));
                    for mut block in finished {
                        let mut finger = 0;
                        for i in 1..block.len() {
                            if block[finger].0 == block[i].0 {
                                block[finger].1 += block[i].1;
                                block[i].1 = 0;
                            }
                            else {
                                finger = i;
                            }
                        }
                        block.retain(|x| x.1 != 0);
                        *thresh += block.len();
                        sorter.push_batch(block, &|x| (*part2)(&x.0));
                    }

                    if *thresh < default_threshold { *thresh = default_threshold; }
                }

                notificator.notify_at(index);
            });

            // 2. go through each time of interest that has reached completion
            notificator.for_each(|index, _count, _notificator| {

                // pull out sorter, ignore count and thresh (irrelevant now).
                if let Some((mut sorter, _, _)) = inputs.remove_key(&index) {

                    let mut session = output.session(&index);
                    let mut buffer = vec![];
                    let mut current = 0;

                    let source = sorter.finish(&|x| (*part2)(&x.0));
                    for (datum, wgt) in source.into_iter().flat_map(|x| x.into_iter()) {
                        let hash = (*part2)(&datum).as_u64();
                        if buffer.len() > 0 && hash != current {
                            buffer.sort_by(|x: &(D,i32),y: &(D,i32)| x.0.cmp(&y.0));
                            session.give_iterator(buffer.drain(..).coalesce());
                        }
                        buffer.push((datum,wgt));
                        current = hash;
                    }

                    if buffer.len() > 0 {
                        buffer.sort_by(|x: &(D,i32),y: &(D,i32)| x.0.cmp(&y.0));
                        session.give_iterator(buffer.drain(..).coalesce());
                    }
                }
            });
        }))
    }
}