1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
//! Match pairs of records based on a key.

use std::default::Default;
use std::collections::HashMap;

use linear_map::LinearMap;
use vec_map::VecMap;

use timely::progress::Timestamp;
use timely::dataflow::Scope;
use timely::dataflow::operators::Binary;
use timely::dataflow::channels::pact::Pipeline;
use timely_sort::Unsigned;

use ::{Data, Collection};
use lattice::Lattice;
use collection::Lookup;
use collection::trace::{Trace,TraceRef};
use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf};

/// Join implementations for `(key,val)` data.
pub trait Join<G: Scope, K: Data, V: Data> {

    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key`.
    ///
    /// The `join` method requires that the two collections both be over pairs of records, and the
    /// first element of the pair must be of the same type. Given two such collections, each pair
    /// of records `(key,val1)` and `(key,val2)` with a matching `key` produces a `(key, val1, val2)`
    /// output record.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![((0,'a'),1),((1,'B'),1)].into_iter().to_stream(scope);
    ///
    ///     // should produce triples `(0,0,'a')` and `(1,2,'B')`.
    ///     col1.join(&col2).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,0,'a'),1), ((1,2,'B'),1)]);
    /// ```
    fn join<V2: Data>(&self, other: &Collection<G, (K,V2)>) -> Collection<G, (K,V,V2)> {
        self.join_map(other, |k,v1,v2| (k.clone(), v1.clone(), v2.clone()))
    }
    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![((0,'a'),1),((1,'B'),1)].into_iter().to_stream(scope);
    ///
    ///     // should produce records `(0 + 0,'a')` and `(1 + 2,'B')`.
    ///     col1.join_map(&col2, |k,v1,v2| (*k + *v1, *v2)).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,'a'),1), ((3,'B'),1)]);
    /// ```
    fn join_map<V2: Data, D: Data, R: Fn(&K, &V, &V2)->D+'static>(&self, other: &Collection<G, (K,V2)>, logic: R) -> Collection<G, D>;
    /// Matches pairs `(key,val1)` and `key` based on `key`, filtering the first collection by values present in the second.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![(0,1)].into_iter().to_stream(scope);
    ///
    ///     // should retain record `(0,0)` and discard `(1,2)`.
    ///     col1.semijoin(&col2).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,0),1)]);
    /// ```
    fn semijoin(&self, other: &Collection<G, K>) -> Collection<G, (K, V)>;
    /// Matches pairs `(key,val1)` and `key` based on `key`, discarding values 
    /// in the first collection if their key is present in the second.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![(0,1)].into_iter().to_stream(scope);
    ///
    ///     // should retain record `(0,0)` and discard `(1,2)`.
    ///     col1.semijoin(&col2).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((1,2),1)]);
    /// ```
    fn antijoin(&self, other: &Collection<G, K>) -> Collection<G, (K, V)>;

    /// Joins two collections with dense unsigned integer keys.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![((0,'a'),1),((1,'B'),1)].into_iter().to_stream(scope);
    ///
    ///     // should produce triples `(0,0,'a')` and `(1,2,'B')`.
    ///     col1.join_u(&col2).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,0,'a'),1), ((1,2,'B'),1)]);
    /// ```
    fn join_u<V2: Data>(&self, other: &Collection<G, (K,V2)>) -> Collection<G, (K,V,V2)> where K: Unsigned+Default {
        self.join_map_u(other, |k,v1,v2| (k.clone(), v1.clone(), v2.clone()))
    }
    /// Joins two collections with dense unsigned integer keys and then applies a map function.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![((0,'a'),1),((1,'B'),1)].into_iter().to_stream(scope);
    ///
    ///     // should produce records `(0 + 0,'a')` and `(1 + 2,'B')`.
    ///     col1.join_map_u(&col2, |k,v1,v2| (*k + *v1, *v2)).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,'a'),1), ((3,'B'),1)]);
    /// ```
    fn join_map_u<V2: Data, D: Data, R: Fn(&K, &V, &V2)->D+'static>(&self, other: &Collection<G, (K,V2)>, logic: R) -> Collection<G, D> where K: Unsigned+Default;
    /// Semijoins a collection with dense unsigned integer keys against a set of such keys.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![(0,1)].into_iter().to_stream(scope);
    ///
    ///     // should retain record `(0,0)` and discard `(1,2)`.
    ///     col1.semijoin(&col2).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,0),1)]);
    /// ```
    fn semijoin_u(&self, other: &Collection<G, K>) -> Collection<G, (K, V)> where K: Unsigned+Default;
    /// Antijoins a collection with dense unsigned integer keys against a set of such keys.
    ///
    /// #Examples
    /// ```ignore
    /// extern crate timely;
    /// use timely::dataflow::operators::{ToStream, Capture};
    /// use timely::dataflow::operators::capture::Extract;
    /// use differential_dataflow::operators::Join;
    ///
    /// let data = timely::example(|scope| {
    ///     let col1 = vec![((0,0),1),((1,2),1)].into_iter().to_stream(scope);
    ///     let col2 = vec![(0,1)].into_iter().to_stream(scope);
    ///
    ///     // should retain record `(0,0)` and discard `(1,2)`.
    ///     col1.semijoin(&col2).capture();
    /// });
    ///
    /// let extracted = data.extract();
    /// assert_eq!(extracted.len(), 1);
    /// assert_eq!(extracted[0].1, vec![((0,0),1)]);
    /// ```
    fn antijoin_u(&self, other: &Collection<G, K>) -> Collection<G, (K, V)> where K: Unsigned+Default;
} 

impl<G: Scope, K: Data, V: Data> Join<G, K, V> for Collection<G, (K, V)> where G::Timestamp: Lattice {
    /// Matches pairs of `(key,val1)` and `(key,val2)` records based on `key` and applies a reduction function.
    fn join_map<V2: Data, D: Data, R>(&self, other: &Collection<G, (K, V2)>, logic: R) -> Collection<G, D>
    where R: Fn(&K, &V, &V2)->D+'static {
        let arranged1 = self.arrange_by_key(|k| k.hashed(), |_| HashMap::new());
        let arranged2 = other.arrange_by_key(|k| k.hashed(), |_| HashMap::new());
        arranged1.join(&arranged2, logic)
    }
    fn semijoin(&self, other: &Collection<G, K>) -> Collection<G, (K, V)> {
        let arranged1 = self.arrange_by_key(|k| k.hashed(), |_| HashMap::new());
        let arranged2 = other.arrange_by_self(|k| k.hashed(), |_| HashMap::new());
        arranged1.join(&arranged2, |k,v,_| (k.clone(), v.clone()))
    }
    fn antijoin(&self, other: &Collection<G, K>) -> Collection<G, (K, V)> {
        self.concat(&self.semijoin(other).negate())
    }

    fn join_map_u<V2: Data, D: Data, R: Fn(&K, &V, &V2)->D+'static>(&self, other: &Collection<G, (K,V2)>, logic: R) -> Collection<G, D> where K: Unsigned+Default {
        let arranged1 = self.arrange_by_key(|k| k.clone(), |x| (VecMap::new(), x));
        let arranged2 = other.arrange_by_key(|k| k.clone(), |x| (VecMap::new(), x));
        arranged1.join(&arranged2, logic)
    }
    fn semijoin_u(&self, other: &Collection<G, K>) -> Collection<G, (K, V)> where K: Unsigned+Default {
        let arranged1 = self.arrange_by_key(|k| k.clone(), |x| (VecMap::new(), x));
        let arranged2 = other.arrange_by_self(|k| k.clone(), |x| (VecMap::new(), x));
        arranged1.join(&arranged2, |k,v,_| (k.clone(), v.clone()))        
    }
    fn antijoin_u(&self, other: &Collection<G, K>) -> Collection<G, (K, V)> where K: Unsigned+Default {
        self.concat(&self.semijoin_u(other).negate())
    }
}

// /// Matches pairs `(key, val1)` and `(key, val2)` for dense unsigned integer keys.
// ///
// /// These methods are optimizations of the general `Join` trait to use `Vec` indices rather than
// /// a more generic hash map. This can substantially reduce the amount of computation and memory
// /// required, but it will allocate as much memory as the largest identifier and so may have poor
// /// performance if the absolute range of keys is large.
// ///
// /// These method may be deprecated in preferences of an approach which allows implementors of `Data` to define
// /// their own approach to indexing data. In this case, a newtype wrapping dense unsigned integers would indicate
// /// the indexing strategy, and the methods would simply be as above.

// pub trait JoinUnsigned<G: Scope, U: Unsigned+Data+Default, V: Data> where G::Timestamp: Lattice {

// }

// impl<G: Scope, U: Unsigned+Data+Default, V: Data> JoinUnsigned<G, U, V> for Collection<G, (U, V)> where G::Timestamp: Lattice {
//     fn join_map_u<V2: Data, D: Data, R: Fn(&U, &V, &V2)->D+'static>(&self, other: &Collection<G, (U,V2)>, logic: R) -> Collection<G, D> {
//         let arranged1 = self.arrange_by_key(|k| k.clone(), |x| (VecMap::new(), x));
//         let arranged2 = other.arrange_by_key(|k| k.clone(), |x| (VecMap::new(), x));
//         arranged1.join(&arranged2, logic)
//     }
//     fn semijoin_u(&self, other: &Collection<G, U>) -> Collection<G, (U, V)> {
//         let arranged1 = self.arrange_by_key(|k| k.clone(), |x| (VecMap::new(), x));
//         let arranged2 = other.arrange_by_self(|k| k.clone(), |x| (VecMap::new(), x));
//         arranged1.join(&arranged2, |k,v,_| (k.clone(), v.clone()))        
//     }
// }

/// Matches the elements of two arranged traces.
///
/// This method is used by the various `join` implementations, but it can also be used 
/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
/// the arrangement is available for re-use, or from the output of a `group` operator.
pub trait JoinArranged<G: Scope, K: Data, V: Data> where G::Timestamp: Lattice {
    /// Joins two arranged collections with the same key type.
    ///
    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function, 
    /// producing a corresponding output record.
    ///
    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait 
    /// contains the implementations for collections.
    fn join<T2,R,RF> (&self, stream2: &Arranged<G,T2>, result: RF) -> Collection<G,R>
    where 
        T2: Trace<Key=K,Index=G::Timestamp>+'static,
        R: Data,
        RF: Fn(&K,&V,&T2::Value)->R+'static,
        for<'a> &'a T2: TraceRef<'a,T2::Key,T2::Index,T2::Value>
        ;
}

impl<TS: Timestamp, G: Scope<Timestamp=TS>, T: Trace<Index=TS>+'static> JoinArranged<G, T::Key, T::Value> for Arranged<G, T> 
    where 
        G::Timestamp: Lattice, 
        for<'a> &'a T: TraceRef<'a, T::Key, T::Index, T::Value> {
    fn join<T2,R,RF>(&self, other: &Arranged<G,T2>, result: RF) -> Collection<G,R> 
    where 
        T2: Trace<Key=T::Key, Index=G::Timestamp>+'static,
        R: Data,
        RF: Fn(&T::Key,&T::Value,&T2::Value)->R+'static,
        for<'a> &'a T2: TraceRef<'a,T2::Key,T2::Index,T2::Value> {

        let mut trace1 = Some(self.trace.clone());
        let mut trace2 = Some(other.trace.clone());

        let mut inputs1 = LinearMap::new();
        let mut inputs2 = LinearMap::new();
        let mut outbuf = LinearMap::new();

        // upper envelope of notified times; 
        // used to restrict diffs processed.
        let mut acknowledged = Vec::new();

        let result = self.stream.binary_notify(&other.stream, Pipeline, Pipeline, "Join", vec![], move |input1, input2, output, notificator| {

            // shut down a trace if the opposing input has been closed out.
            // TODO : more generally, we would like to announce our frontier to each trace, so that it may
            // TODO : be compacted when the frontiers of all of its referees have advanced past some point.
            if trace2.is_some() && notificator.frontier(0).len() == 0 && inputs1.len() == 0 { trace2 = None; }
            if trace1.is_some() && notificator.frontier(1).len() == 0 && inputs2.len() == 0 { trace1 = None; }

            // read input 1, push all data to queues
            input1.for_each(|time, data| {
                assert!(data.len() == 1);
                inputs1.entry_or_insert(time.time(), || data.drain(..).next().unwrap());
                notificator.notify_at(time);
            });

            // read input 2, push all data to queues
            input2.for_each(|time, data| {
                assert!(data.len() == 1);
                inputs2.entry_or_insert(time.time(), || data.drain(..).next().unwrap());
                notificator.notify_at(time);
            });

            // Notification means we have inputs to process or outputs to send.
            // while let Some((capability, _count)) = notificator.next() {
            notificator.for_each(|capability, _count, notificator| {

                let time = capability.time();

                // We must be careful to only respond to pairs of differences at `time` with
                // one output record, not two. To do this correctly, we acknowledge the time
                // after processing the first input, so that it is as if we added the inputs 
                // to the traces in this order.

                // TODO : Evaluate only looking up values by key rather than receiving them.

                // TODO : It is probably wise to compact the results from each key before sending.
                // TODO : At least, "changes" in join inputs often result in cancellations.

                // TODO : Consider delaying the production of output tuples until the time
                // TODO : has been notified; like `Group`. This would address the issues above
                // TODO : by looking up data only once it is present, ensuring only non-cancelled
                // TODO : records would be transmitted (also a good thing).

                // compare fresh data on the first input against stale data on the second
                if let Some(ref trace) = trace2 {
                    if let Some((keys, cnts, vals)) = inputs1.remove_key(&time) {
                        let mut vals = vals.iter();
                        for (key, &cnt) in keys.iter().zip(cnts.iter()) {
                            let borrow = trace.borrow();
                            for (t, diffs) in borrow.trace(key) {
                                if acknowledged.iter().any(|t2| t <= t2) {
                                    let mut output = outbuf.entry_or_insert(time.join(t), || Vec::new());
                                    for (ref val2, wgt2) in diffs {
                                        for &(ref val1, wgt1) in vals.clone().take(cnt as usize) {
                                            output.push((result(key, val1, val2), wgt1 * wgt2));
                                        }
                                    }
                                }
                            }
                            for _ in 0..cnt { vals.next(); }
                        }
                    }
                }

                // acknowledge the time, so we can use it below
                acknowledged.retain(|t| !(t <= &time));
                acknowledged.push(time.clone());

                // compare fresh data on the second input against fresh data on the first
                if let Some(ref trace) = trace1 {         
                    if let Some((keys, cnts, vals)) = inputs2.remove_key(&time) {
                        let mut vals = vals.iter();
                        for (key, &cnt) in keys.iter().zip(cnts.iter()) {
                            let borrow = trace.borrow();
                            for (t, diffs) in borrow.trace(key) {
                                if acknowledged.iter().any(|t2| t <= t2) {
                                    let mut output = outbuf.entry_or_insert(time.join(t), || Vec::new());
                                    for (ref val1, wgt1) in diffs {
                                        for &(ref val2, wgt2) in vals.clone().take(cnt as usize) {
                                            output.push((result(key, val1, val2), wgt1 * wgt2));
                                        }
                                    }
                                }
                            }
                            for _ in 0..cnt { vals.next(); }
                        }
                    }
                }

                // TODO : This only sends data at the current time.
                // TODO : this may be unwise, as the `join` may produce
                // TODO : more data than can be easily stored without 
                // TODO : aggregation. It may be that we should send everything
                // TODO : and let the receiver store the data as it sees fit.
                //
                // TODO : See note above about delaying evalutation of time 
                // TODO : until notification that each input has reach time.
                // TODO : Likely more expensive, but keeps memory footprint
                // TODO : proportional to input, rather than output sizes.
                if let Some(mut buffer) = outbuf.remove_key(&time) {
                    output.session(&capability).give_iterator(buffer.drain(..));
                }

                // make sure we hold capabilities for each time still to send at.
                for (new_time, _) in &outbuf {
                    // NOTE : WHOA THIS IS MESSED UP;
                    if capability.time().le(new_time) {
                        notificator.notify_at(capability.delayed(new_time));
                    }
                }
            });
        });

        Collection::new(result)
    }
}