1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
//! An operator acting on `(Key, ())` collections.

use std::rc::Rc;
use std::cell::RefCell;
// use std::ops::DerefMut;
use std::default::Default;

use std::collections::HashMap;
use linear_map::LinearMap;
use vec_map::VecMap;

use ::{Collection, Data, Delta};
use timely::dataflow::*;
use timely::dataflow::operators::Unary;
use timely::dataflow::channels::pact::Pipeline;
// use timely::dataflow::channels::pact::Exchange;

use timely_sort::Unsigned;

use collection::{LeastUpperBound, Lookup};
use collection::count::Offset;
use collection::count::Count as CountTrace;
use collection::compact::Compact;
use collection::trace::Trace;

use operators::arrange::{Arranged, ArrangeBySelf};

/// Extension trait for the `threshold` differential dataflow method
pub trait Threshold<G: Scope, K: Data>
    where G::Timestamp: LeastUpperBound {

    /// Groups records by their first field, and applies reduction logic to the associated values.
    ///
    /// It would be nice for this to be generic over possible arrangements of data, but it seems that Rust
    /// ICEs when I try this. I'm not exactly sure how one would specify the arrangement (the type is used
    /// in `logic`, by way of its associated iterator types, but not clearly enough to drive type inference),
    /// but we don't have that problem yet anyhow.
    ///
    /// In any case, it would be great if the same implementation could handle `(K,V)` pairs and `K` elements
    /// treated as `(K, ())` pairs.
    fn threshold<L>(&self, logic: L) -> Collection<G, K> where L: Fn(&K, Delta)->Delta+'static;

    /// Collects distinct elements.
    fn distinct(&self) -> Collection<G, K> {
        self.threshold(|_,_| 1)
    }
}

impl<G: Scope, K: Data+Default> Threshold<G, K> for Collection<G, K> where G::Timestamp: LeastUpperBound {
    fn threshold<L>(&self, logic: L) -> Collection<G, K>
        where L: Fn(&K, Delta)->Delta+'static {
            self.arrange_by_self(|k| k.hashed(), |_| HashMap::new())
                .threshold(|k| k.hashed(), |_| HashMap::new(), logic)
                .as_collection()
                .map(|(k,_)| k)
    }
}


/// Extension trait for the `group_u` differential dataflow method.
pub trait ThresholdUnsigned<G: Scope, U: Unsigned+Data+Default>
    where G::Timestamp: LeastUpperBound {
    /// Groups records by their first field, when this field implements `Unsigned`.
    ///
    /// This method uses a `Vec<Option<_>>` as its internal storage, allocating
    /// enough memory to directly index based on the first fields. This can be
    /// very useful when these fields are small integers, but it can be expensive
    /// if they are large and sparse.
    fn threshold_u<L>(&self, logic: L) -> Collection<G, U> where L: Fn(&U, Delta)->Delta+'static;

    /// Collects distinct elements, when they implement `Unsigned`.
    fn distinct_u(&self) -> Collection<G, U> {
        self.threshold_u(|_,_| 1)
    }
}

impl<G: Scope, U: Unsigned+Data+Default> ThresholdUnsigned<G, U> for Collection<G, U>
where G::Timestamp: LeastUpperBound {
    fn threshold_u<L>(&self, logic: L) -> Collection<G, U>
        where L: Fn(&U, Delta)->Delta+'static {
            self.arrange_by_self(|k| k.as_u64(), |x| (VecMap::new(), x))
                .threshold(|k| k.as_u64(), |x| (VecMap::new(), x), logic)
                .as_collection()
                .map(|(k,_)| k)
        }
}


/// Extension trait for the `group` operator on `Arrange<_>` data.
pub trait ThresholdArranged<G: Scope, K: Data> {
    /// Groups arranged data using a key hash function, a lookup generator, and user logic.
    ///
    /// This method is used by `group` and `group_u` as defined on `Collection`, and it can
    /// also be called directly by user code that wants access to the arranged output. This
    /// can be helpful when the resulting data are re-used immediately with the same keys.
    fn threshold<U, KH, Look, LookG, Logic>(&self, key_h: KH, look: LookG, logic: Logic)
        -> Arranged<G,CountTrace<K,G::Timestamp,Look>>
    where
        G::Timestamp: LeastUpperBound,
        U:     Unsigned+Default,
        KH:    Fn(&K)->U+'static,
        Look:  Lookup<K, Offset>+'static,
        LookG: Fn(u64)->Look,
        Logic: Fn(&K, Delta)->Delta+'static;
}

impl<G, K, L> ThresholdArranged<G, K> for Arranged<G, CountTrace<K, G::Timestamp, L>>
where
    G: Scope,
    K: Data,
    L: Lookup<K, Offset>+'static,
    G::Timestamp: LeastUpperBound {

    fn threshold<U, KH, Look, LookG, Logic>(&self, key_h: KH, look: LookG, logic: Logic)
        -> Arranged<G, CountTrace<K,G::Timestamp,Look>>
    where
        U:     Unsigned+Default,
        KH:    Fn(&K)->U+'static,
        Look:  Lookup<K, Offset>+'static,
        LookG: Fn(u64)->Look,
        Logic: Fn(&K, Delta)->Delta+'static {

        let peers = self.stream.scope().peers();
        let mut log_peers = 0;
        while (1 << (log_peers + 1)) <= peers {
            log_peers += 1;
        }

        let source = self.trace.clone();
        let result = Rc::new(RefCell::new(CountTrace::new(look(log_peers))));
        let target = result.clone();

        // A map from times to received (key, val, wgt) triples.
        let mut inputs = LinearMap::new();

        // A map from times to a list of keys that need processing at that time.
        let mut to_do = LinearMap::new();

        // fabricate a data-parallel operator using the `unary_notify` pattern.
        let stream = self.stream.unary_notify(Pipeline, "CountArranged", vec![], move |input, output, notificator| {

            // 1. read each input, and stash it in our staging area.
            // only stash the keys, because vals etc are in self.trace
            input.for_each(|time, data| {
                inputs.entry_or_insert(time.time(), || {
                    notificator.notify_at(time);
                    data.drain(..).next().unwrap().0
                });
            });

            // 2. go through each time of interest that has reached completion
            // times are interesting either because we received data, or because we conclude
            // in the processing of a time that a future time will be interesting.

            notificator.for_each(|capability, _count, notificator| {

                let time = capability.time();

                // 2a. If we received any keys, determine the interesting times for each.
                //     We then enqueue the keys at the corresponding time, for use later.
                if let Some(queue) = inputs.remove_key(&time) {

                    let source = source.borrow();
                    let mut stash = Vec::new();
                    for key in queue {
                        if source.get_difference(&key, &time).is_some() {

                            // determine times at which updates may occur.
                            stash.push(capability.time());
                            source.interesting_times(&key, &time, &mut stash);

                            for new_time in &stash {
                                to_do.entry_or_insert(new_time.clone(), || {
                                         notificator.notify_at(capability.delayed(new_time));
                                         Vec::new()
                                     })
                                     .push(key.clone());
                            }
                            stash.clear();
                        }
                    }
                }

                // 2b. Process any interesting keys at this time.
                if let Some(mut keys) = to_do.remove_key(&time) {

                    // We would like these keys in a particular order.
                    // We also want to de-duplicate them, in case there are dupes.
                    keys.sort_by(|x,y| (key_h(&x), x).cmp(&(key_h(&y), y)));
                    keys.dedup();

                    // accumulations for installation into result
                    let mut accumulation = Compact::new(0,0);

                    // borrow `source` to avoid constant re-borrowing.
                    let source_borrow = source.borrow_mut();

                    for key in keys {

                        let count = source_borrow.get_count(&key, &time);
                        let output = if count > 0 { logic(&key, count) } else { 0 };
                        let current = target.borrow().get_count(&key, &time);

                        if output != current {
                            let mut compact = accumulation.session();
                            // session.give((key.clone(), output - current));
                            compact.push((), output - current);
                            compact.done(key);
                        }

                    }

                    if accumulation.vals.len() > 0 {
                        output.session(&capability).give((accumulation.keys.clone(), accumulation.cnts.clone(), accumulation.vals.clone()));
                        target.borrow_mut().set_difference(time, accumulation);
                    }
                }
            });
        });

        Arranged { stream: stream, trace: result }
    }
}