differential_dataflow/operators/
join.rs

1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7
8use timely::order::PartialOrder;
9use timely::progress::Timestamp;
10use timely::dataflow::Scope;
11use timely::dataflow::operators::generic::{Operator, OutputHandle};
12use timely::dataflow::channels::pact::Pipeline;
13use timely::dataflow::operators::Capability;
14use timely::dataflow::channels::pushers::tee::Tee;
15
16use crate::hashable::Hashable;
17use crate::{Data, ExchangeData, Collection, AsCollection};
18use crate::difference::{Semigroup, Abelian, Multiply};
19use crate::lattice::Lattice;
20use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf};
21use crate::trace::{BatchReader, Cursor};
22use crate::operators::ValueHistory;
23
24use crate::trace::TraceReader;
25
26/// Join implementations for `(key,val)` data.
27pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
28
29    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
30    ///
31    /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
32    ///
33    /// # Examples
34    ///
35    /// ```
36    /// use differential_dataflow::input::Input;
37    /// use differential_dataflow::operators::Join;
38    ///
39    /// ::timely::example(|scope| {
40    ///
41    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
42    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
43    ///     let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
44    ///
45    ///     x.join(&y)
46    ///      .assert_eq(&z);
47    /// });
48    /// ```
49    fn join<V2, R2>(&self, other: &Collection<G, (K,V2), R2>) -> Collection<G, (K,(V,V2)), <R as Multiply<R2>>::Output>
50    where
51        K: ExchangeData,
52        V2: ExchangeData,
53        R2: ExchangeData+Semigroup,
54        R: Multiply<R2>,
55        <R as Multiply<R2>>::Output: Semigroup
56    {
57        self.join_map(other, |k,v,v2| (k.clone(),(v.clone(),v2.clone())))
58    }
59
60    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
61    ///
62    /// # Examples
63    ///
64    /// ```
65    /// use differential_dataflow::input::Input;
66    /// use differential_dataflow::operators::Join;
67    ///
68    /// ::timely::example(|scope| {
69    ///
70    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
71    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
72    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
73    ///
74    ///     x.join_map(&y, |_key, &a, &b| (a,b))
75    ///      .assert_eq(&z);
76    /// });
77    /// ```
78    fn join_map<V2, R2, D, L>(&self, other: &Collection<G, (K,V2), R2>, logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
79    where K: ExchangeData, V2: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup, D: Data, L: FnMut(&K, &V, &V2)->D+'static;
80
81    /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
82    ///
83    /// When the second collection contains frequencies that are either zero or one this is the more traditional
84    /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
85    /// the counts of the records in the first input.
86    ///
87    /// # Examples
88    ///
89    /// ```
90    /// use differential_dataflow::input::Input;
91    /// use differential_dataflow::operators::Join;
92    ///
93    /// ::timely::example(|scope| {
94    ///
95    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
96    ///     let y = scope.new_collection_from(vec![0, 2]).1;
97    ///     let z = scope.new_collection_from(vec![(0, 1)]).1;
98    ///
99    ///     x.semijoin(&y)
100    ///      .assert_eq(&z);
101    /// });
102    /// ```
103    fn semijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
104    where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup;
105
106    /// Subtracts the semijoin with `other` from `self`.
107    ///
108    /// In the case that `other` has multiplicities zero or one this results
109    /// in a relational antijoin, in which we discard input records whose key
110    /// is present in `other`. If the multiplicities could be other than zero
111    /// or one, the semantic interpretation of this operator is less clear.
112    ///
113    /// In almost all cases, you should ensure that `other` has multiplicities
114    /// that are zero or one, perhaps by using the `distinct` operator.
115    ///
116    /// # Examples
117    ///
118    /// ```
119    /// use differential_dataflow::input::Input;
120    /// use differential_dataflow::operators::Join;
121    ///
122    /// ::timely::example(|scope| {
123    ///
124    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
125    ///     let y = scope.new_collection_from(vec![0, 2]).1;
126    ///     let z = scope.new_collection_from(vec![(1, 3)]).1;
127    ///
128    ///     x.antijoin(&y)
129    ///      .assert_eq(&z);
130    /// });
131    /// ```
132    fn antijoin<R2>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
133    where K: ExchangeData, R2: ExchangeData+Semigroup, R: Multiply<R2, Output = R>, R: Abelian;
134}
135
136impl<G, K, V, R> Join<G, K, V, R> for Collection<G, (K, V), R>
137where
138    G: Scope,
139    K: ExchangeData+Hashable,
140    V: ExchangeData,
141    R: ExchangeData+Semigroup,
142    G::Timestamp: Lattice+Ord,
143{
144    fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <R as Multiply<R2>>::Output>
145    where R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup, L: FnMut(&K, &V, &V2)->D+'static {
146        let arranged1 = self.arrange_by_key();
147        let arranged2 = other.arrange_by_key();
148        arranged1.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
149    }
150
151    fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <R as Multiply<R2>>::Output>
152    where R: Multiply<R2>, <R as Multiply<R2>>::Output: Semigroup {
153        let arranged1 = self.arrange_by_key();
154        let arranged2 = other.arrange_by_self();
155        arranged1.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
156    }
157
158    fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), R>
159    where R: Multiply<R2, Output=R>, R: Abelian {
160        self.concat(&self.semijoin(other).negate())
161    }
162}
163
164impl<G, K, V, Tr> Join<G, K, V, Tr::Diff> for Arranged<G, Tr>
165where
166    G: Scope,
167    G::Timestamp: Lattice+Ord,
168    Tr: for<'a> TraceReader<Time=G::Timestamp, Key<'a> = &'a K, Val<'a> = &'a V>+Clone+'static,
169    K: ExchangeData+Hashable,
170    V: Data + 'static,
171    Tr::Diff: Semigroup,
172{
173    fn join_map<V2: ExchangeData, R2: ExchangeData+Semigroup, D: Data, L>(&self, other: &Collection<G, (K, V2), R2>, mut logic: L) -> Collection<G, D, <Tr::Diff as Multiply<R2>>::Output>
174    where 
175        Tr::Diff: Multiply<R2>,
176        <Tr::Diff as Multiply<R2>>::Output: Semigroup,
177        L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2)->D+'static,
178    {
179        let arranged2 = other.arrange_by_key();
180        self.join_core(&arranged2, move |k,v1,v2| Some(logic(k,v1,v2)))
181    }
182
183    fn semijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
184    where Tr::Diff: Multiply<R2>, <Tr::Diff as Multiply<R2>>::Output: Semigroup {
185        let arranged2 = other.arrange_by_self();
186        self.join_core(&arranged2, |k,v,_| Some((k.clone(), v.clone())))
187    }
188
189    fn antijoin<R2: ExchangeData+Semigroup>(&self, other: &Collection<G, K, R2>) -> Collection<G, (K, V), Tr::Diff>
190    where Tr::Diff: Multiply<R2, Output=Tr::Diff>, Tr::Diff: Abelian {
191        self.as_collection(|k,v| (k.clone(), v.clone()))
192            .concat(&self.semijoin(other).negate())
193    }
194}
195
196/// Matches the elements of two arranged traces.
197///
198/// This method is used by the various `join` implementations, but it can also be used
199/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
200/// the arrangement is available for re-use, or from the output of a `reduce` operator.
201pub trait JoinCore<G: Scope, K: 'static + ?Sized, V: 'static + ?Sized, R: Semigroup> where G::Timestamp: Lattice+Ord {
202
203    /// Joins two arranged collections with the same key type.
204    ///
205    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
206    /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
207    /// every value returned by the iterator.
208    ///
209    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
210    /// contains the implementations for collections.
211    ///
212    /// # Examples
213    ///
214    /// ```
215    /// use differential_dataflow::input::Input;
216    /// use differential_dataflow::operators::arrange::ArrangeByKey;
217    /// use differential_dataflow::operators::join::JoinCore;
218    /// use differential_dataflow::trace::Trace;
219    ///
220    /// ::timely::example(|scope| {
221    ///
222    ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
223    ///                  .arrange_by_key();
224    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
225    ///                  .arrange_by_key();
226    ///
227    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
228    ///
229    ///     x.join_core(&y, |_key, &a, &b| Some((a, b)))
230    ///      .assert_eq(&z);
231    /// });
232    /// ```
233    fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
234    where
235        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
236        Tr2::Diff: Semigroup,
237        R: Multiply<Tr2::Diff>,
238        <R as Multiply<Tr2::Diff>>::Output: Semigroup,
239        I: IntoIterator,
240        I::Item: Data,
241        L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
242        ;
243
244    /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
245    /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
246    /// flexibility, but is more error-prone.
247    ///
248    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
249    /// which produces something implementing `IntoIterator`, where the output collection will have an entry
250    /// for every value returned by the iterator.
251    ///
252    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
253    /// contains the implementations for collections.
254    ///
255    /// # Examples
256    ///
257    /// ```
258    /// use differential_dataflow::input::Input;
259    /// use differential_dataflow::operators::arrange::ArrangeByKey;
260    /// use differential_dataflow::operators::join::JoinCore;
261    /// use differential_dataflow::trace::Trace;
262    ///
263    /// ::timely::example(|scope| {
264    ///
265    ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
266    ///                  .arrange_by_key();
267    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
268    ///                  .arrange_by_key();
269    ///
270    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
271    ///
272    ///     // Returned values have weight `a`
273    ///     x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
274    ///      .assert_eq(&z);
275    /// });
276    /// ```
277    fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
278    where
279        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
280        Tr2::Diff: Semigroup,
281        D: Data,
282        ROut: Semigroup,
283        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
284        L: for<'a> FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
285        ;
286}
287
288
289impl<G, K, V, R> JoinCore<G, K, V, R> for Collection<G, (K, V), R>
290where
291    G: Scope,
292    K: ExchangeData+Hashable,
293    V: ExchangeData,
294    R: ExchangeData+Semigroup,
295    G::Timestamp: Lattice+Ord,
296{
297    fn join_core<Tr2,I,L> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,I::Item,<R as Multiply<Tr2::Diff>>::Output>
298    where
299        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
300        Tr2::Diff: Semigroup,
301        R: Multiply<Tr2::Diff>,
302        <R as Multiply<Tr2::Diff>>::Output: Semigroup,
303        I: IntoIterator,
304        I::Item: Data,
305        L: FnMut(&K,&V,Tr2::Val<'_>)->I+'static,
306    {
307        self.arrange_by_key()
308            .join_core(stream2, result)
309    }
310
311    fn join_core_internal_unsafe<Tr2,I,L,D,ROut> (&self, stream2: &Arranged<G,Tr2>, result: L) -> Collection<G,D,ROut>
312    where
313        Tr2: for<'a> TraceReader<Key<'a>=&'a K, Time=G::Timestamp>+Clone+'static,
314        Tr2::Diff: Semigroup,
315        I: IntoIterator<Item=(D, G::Timestamp, ROut)>,
316        L: FnMut(&K,&V,Tr2::Val<'_>,&G::Timestamp,&R,&Tr2::Diff)->I+'static,
317        D: Data,
318        ROut: Semigroup,
319    {
320        self.arrange_by_key().join_core_internal_unsafe(stream2, result)
321    }
322}
323
324/// An equijoin of two traces, sharing a common key type.
325///
326/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
327/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
328/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
329/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
330///
331/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
332pub fn join_traces<G, T1, T2, I,L,D,R>(arranged1: &Arranged<G,T1>, arranged2: &Arranged<G,T2>, mut result: L) -> Collection<G, D, R>
333where
334    G: Scope,
335    G::Timestamp: Lattice+Ord,
336    T1: TraceReader<Time=G::Timestamp>+Clone+'static,
337    T1::Diff: Semigroup,
338    T2: for<'a> TraceReader<Key<'a>=T1::Key<'a>, Time=G::Timestamp>+Clone+'static,
339    T2::Diff: Semigroup,
340    D: Data,
341    R: Semigroup,
342    I: IntoIterator<Item=(D, G::Timestamp, R)>,
343    L: FnMut(T1::Key<'_>,T1::Val<'_>,T2::Val<'_>,&G::Timestamp,&T1::Diff,&T2::Diff)->I+'static,
344{
345    // Rename traces for symmetry from here on out.
346    let mut trace1 = arranged1.trace.clone();
347    let mut trace2 = arranged2.trace.clone();
348
349    arranged1.stream.binary_frontier(&arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| {
350
351        // Acquire an activator to reschedule the operator when it has unfinished work.
352        use timely::scheduling::Activator;
353        let activations = arranged1.stream.scope().activations().clone();
354        let activator = Activator::new(&info.address[..], activations);
355
356        // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
357        // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
358        // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
359        // initial work for the two traces, and before the operator is constructed.
360
361        // Acknowledged frontier for each input.
362        // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
363        // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
364        // the physical compaction frontier of their corresponding trace.
365        // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
366        use timely::progress::frontier::Antichain;
367        let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
368        let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
369
370        // deferred work of batches from each input.
371        let mut todo1 = std::collections::VecDeque::new();
372        let mut todo2 = std::collections::VecDeque::new();
373
374        // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
375        trace1.map_batches(|batch1| {
376            acknowledged1.clone_from(batch1.upper());
377            // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
378            // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
379            // Once we start streaming batches in, we will need to respond to new batches from
380            // `input1` with logic that would have otherwise been here. Check out the next loop
381            // for the structure.
382        });
383        // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
384        // iterating through batches and capturing the upper bound. This is a great moment to assert that
385        // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
386        // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
387        assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));
388
389        // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
390        // on both traces at the same time, as they could be the same trace and this would panic.
391        let mut batch2_cursors = Vec::new();
392        trace2.map_batches(|batch2| {
393            acknowledged2.clone_from(batch2.upper());
394            batch2_cursors.push((batch2.cursor(), batch2.clone()));
395        });
396        // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
397        // iterating through batches and capturing the upper bound. This is a great moment to assert that
398        // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
399        // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
400        assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
401
402        // Load up deferred work using trace2 cursors and batches captured just above.
403        for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
404            // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
405            let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
406            // We could downgrade the capability here, but doing so is a bit complicated mathematically.
407            // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
408            // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
409            // that property.
410            todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
411        }
412
413        // Droppable handles to shared trace data structures.
414        let mut trace1_option = Some(trace1);
415        let mut trace2_option = Some(trace2);
416
417        // Swappable buffers for input extraction.
418        let mut input1_buffer = Vec::new();
419        let mut input2_buffer = Vec::new();
420
421        move |input1, input2, output| {
422
423            // 1. Consuming input.
424            //
425            // The join computation repeatedly accepts batches of updates from each of its inputs.
426            //
427            // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
428            // updates from its other input. It is important to track which updates have been accepted, because
429            // we use a shared trace and there may be updates present that are in advance of this accepted bound.
430            //
431            // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
432            // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
433            // This last case is a consequence of our inability to transmit empty batches, as they may be formed
434            // in the absence of timely dataflow capabilities.
435
436            // Drain input 1, prepare work.
437            input1.for_each(|capability, data| {
438                // This test *should* always pass, as we only drop a trace in response to the other input emptying.
439                if let Some(ref mut trace2) = trace2_option {
440                    let capability = capability.retain();
441                    data.swap(&mut input1_buffer);
442                    for batch1 in input1_buffer.drain(..) {
443                        // Ignore any pre-loaded data.
444                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
445                            if !batch1.is_empty() {
446                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
447                                // at start-up, and have held back physical compaction ever since.
448                                let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
449                                let batch1_cursor = batch1.cursor();
450                                todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone()));
451                            }
452
453                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
454                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
455                            // able to just assume the most recent `batch1.upper`
456                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
457                            acknowledged1.clone_from(batch1.upper());
458                        }
459                    }
460                }
461                else { panic!("`trace2_option` dropped before `input1` emptied!"); }
462            });
463
464            // Drain input 2, prepare work.
465            input2.for_each(|capability, data| {
466                // This test *should* always pass, as we only drop a trace in response to the other input emptying.
467                if let Some(ref mut trace1) = trace1_option {
468                    let capability = capability.retain();
469                    data.swap(&mut input2_buffer);
470                    for batch2 in input2_buffer.drain(..) {
471                        // Ignore any pre-loaded data.
472                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
473                            if !batch2.is_empty() {
474                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
475                                // at start-up, and have held back physical compaction ever since.
476                                let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
477                                let batch2_cursor = batch2.cursor();
478                                todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
479                            }
480
481                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
482                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
483                            // able to just assume the most recent `batch2.upper`
484                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
485                            acknowledged2.clone_from(batch2.upper());
486                        }
487                    }
488                }
489                else { panic!("`trace1_option` dropped before `input2` emptied!"); }
490            });
491
492            // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
493            if let Some(trace1) = trace1_option.as_mut() {
494                trace1.advance_upper(&mut acknowledged1);
495            }
496            if let Some(trace2) = trace2_option.as_mut() {
497                trace2.advance_upper(&mut acknowledged2);
498            }
499
500            // 2. Join computation.
501            //
502            // For each of the inputs, we do some amount of work (measured in terms of number
503            // of output records produced). This is meant to yield control to allow downstream
504            // operators to consume and reduce the output, but it it also means to provide some
505            // degree of responsiveness. There is a potential risk here that if we fall behind
506            // then the increasing queues hold back physical compaction of the underlying traces
507            // which results in unintentionally quadratic processing time (each batch of either
508            // input must scan all batches from the other input).
509
510            // Perform some amount of outstanding work.
511            let mut fuel = 1_000_000;
512            while !todo1.is_empty() && fuel > 0 {
513                todo1.front_mut().unwrap().work(
514                    output,
515                    |k,v2,v1,t,r2,r1| result(k,v1,v2,t,r1,r2),
516                    &mut fuel
517                );
518                if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
519            }
520
521            // Perform some amount of outstanding work.
522            let mut fuel = 1_000_000;
523            while !todo2.is_empty() && fuel > 0 {
524                todo2.front_mut().unwrap().work(
525                    output,
526                    |k,v1,v2,t,r1,r2| result(k,v1,v2,t,r1,r2),
527                    &mut fuel
528                );
529                if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
530            }
531
532            // Re-activate operator if work remains.
533            if !todo1.is_empty() || !todo2.is_empty() {
534                activator.activate();
535            }
536
537            // 3. Trace maintenance.
538            //
539            // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
540            // the progress of an input, because should we ever drop one of the traces we will
541            // lose the ability to extract information from anything other than the input.
542            // For example, if we dropped `trace2` we would not be able to use `advance_upper`
543            // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
544            // compaction of `trace1`.
545
546            // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
547            if let Some(trace1) = trace1_option.as_mut() {
548                if input2.frontier().is_empty() { trace1_option = None; }
549                else {
550                    // Allow `trace1` to compact logically up to the frontier we may yet receive,
551                    // in the opposing input (`input2`). All `input2` times will be beyond this
552                    // frontier, and joined times only need to be accurate when advanced to it.
553                    trace1.set_logical_compaction(input2.frontier().frontier());
554                    // Allow `trace1` to compact physically up to the upper bound of batches we
555                    // have received in its input (`input1`). We will not require a cursor that
556                    // is not beyond this bound.
557                    trace1.set_physical_compaction(acknowledged1.borrow());
558                }
559            }
560
561            // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
562            if let Some(trace2) = trace2_option.as_mut() {
563                if input1.frontier().is_empty() { trace2_option = None;}
564                else {
565                    // Allow `trace2` to compact logically up to the frontier we may yet receive,
566                    // in the opposing input (`input1`). All `input1` times will be beyond this
567                    // frontier, and joined times only need to be accurate when advanced to it.
568                    trace2.set_logical_compaction(input1.frontier().frontier());
569                    // Allow `trace2` to compact physically up to the upper bound of batches we
570                    // have received in its input (`input2`). We will not require a cursor that
571                    // is not beyond this bound.
572                    trace2.set_physical_compaction(acknowledged2.borrow());
573                }
574            }
575        }
576    })
577    .as_collection()
578}
579
580
581/// Deferred join computation.
582///
583/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
584/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
585/// dataflow system a chance to run operators that can consume and aggregate the data.
586struct Deferred<T, R, C1, C2, D>
587where
588    T: Timestamp+Lattice+Ord,
589    R: Semigroup,
590    C1: Cursor<Time=T>,
591    C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
592    C1::Diff: Semigroup,
593    C2::Diff: Semigroup,
594    D: Ord+Clone+Data,
595{
596    trace: C1,
597    trace_storage: C1::Storage,
598    batch: C2,
599    batch_storage: C2::Storage,
600    capability: Capability<T>,
601    done: bool,
602    temp: Vec<((D, T), R)>,
603}
604
605impl<T, R, C1, C2, D> Deferred<T, R, C1, C2, D>
606where
607    C1: Cursor<Time=T>,
608    C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
609    C1::Diff: Semigroup,
610    C2::Diff: Semigroup,
611    T: Timestamp+Lattice+Ord,
612    R: Semigroup,
613    D: Clone+Data,
614{
615    fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
616        Deferred {
617            trace,
618            trace_storage,
619            batch,
620            batch_storage,
621            capability,
622            done: false,
623            temp: Vec::new(),
624        }
625    }
626
627    fn work_remains(&self) -> bool {
628        !self.done
629    }
630
631    /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
632    #[inline(never)]
633    fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>, mut logic: L, fuel: &mut usize)
634    where 
635        I: IntoIterator<Item=(D, T, R)>,
636        L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I,
637    {
638
639        let meet = self.capability.time();
640
641        let mut effort = 0;
642        let mut session = output.session(&self.capability);
643
644        let trace_storage = &self.trace_storage;
645        let batch_storage = &self.batch_storage;
646
647        let trace = &mut self.trace;
648        let batch = &mut self.batch;
649
650        let temp = &mut self.temp;
651        let mut thinker = JoinThinker::new();
652
653        while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) && effort < *fuel {
654
655            match trace.key(trace_storage).cmp(&batch.key(batch_storage)) {
656                Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
657                Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
658                Ordering::Equal => {
659
660                    thinker.history1.edits.load(trace, trace_storage, |time| time.join(meet));
661                    thinker.history2.edits.load(batch, batch_storage, |time| time.clone());
662
663                    assert_eq!(temp.len(), 0);
664
665                    // populate `temp` with the results in the best way we know how.
666                    thinker.think(|v1,v2,t,r1,r2| {
667                        let key = batch.key(batch_storage);
668                        for (d, t, r) in logic(key, v1, v2, &t, r1, r2) {
669                            temp.push(((d, t), r));
670                        }
671                    });
672
673                    // TODO: This consolidation is optional, and it may not be very
674                    //       helpful. We might try harder to understand whether we
675                    //       should do this work here, or downstream at consumers.
676                    // TODO: Perhaps `thinker` should have the buffer, do smarter
677                    //       consolidation, and then deposit results in `session`.
678                    crate::consolidation::consolidate(temp);
679
680                    effort += temp.len();
681                    for ((d, t), r) in temp.drain(..) {
682                        session.give((d, t, r));
683                    }
684
685                    batch.step_key(batch_storage);
686                    trace.step_key(trace_storage);
687
688                    thinker.history1.clear();
689                    thinker.history2.clear();
690                }
691            }
692        }
693
694        self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
695
696        if effort > *fuel { *fuel = 0; }
697        else              { *fuel -= effort; }
698    }
699}
700
701struct JoinThinker<'a, C1, C2>
702where
703    C1: Cursor,
704    C2: Cursor<Time = C1::Time>,
705    C1::Time: Lattice+Ord+Clone,
706    C1::Diff: Semigroup,
707    C2::Diff: Semigroup,
708{
709    pub history1: ValueHistory<'a, C1>,
710    pub history2: ValueHistory<'a, C2>,
711}
712
713impl<'a, C1, C2> JoinThinker<'a, C1, C2>
714where
715    C1: Cursor,
716    C2: Cursor<Time = C1::Time>,
717    C1::Time: Lattice+Ord+Clone,
718    C1::Diff: Semigroup,
719    C2::Diff: Semigroup,
720{
721    fn new() -> Self {
722        JoinThinker {
723            history1: ValueHistory::new(),
724            history2: ValueHistory::new(),
725        }
726    }
727
728    fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
729
730        // for reasonably sized edits, do the dead-simple thing.
731        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
732            self.history1.edits.map(|v1, t1, d1| {
733                self.history2.edits.map(|v2, t2, d2| {
734                    results(v1, v2, t1.join(t2), d1, d2);
735                })
736            })
737        }
738        else {
739
740            let mut replay1 = self.history1.replay();
741            let mut replay2 = self.history2.replay();
742
743            // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
744            //       in here. If a time is ever repeated, for example, the call will be identical
745            //       and accomplish nothing. If only a single record has been added, it may not
746            //       be worth the time to collapse (advance, re-sort) the data when a linear scan
747            //       is sufficient.
748
749            while !replay1.is_done() && !replay2.is_done() {
750
751                if replay1.time().unwrap().cmp(replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
752                    replay2.advance_buffer_by(replay1.meet().unwrap());
753                    for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
754                        let (val1, time1, diff1) = replay1.edit().unwrap();
755                        results(val1, val2, time1.join(time2), diff1, diff2);
756                    }
757                    replay1.step();
758                }
759                else {
760                    replay1.advance_buffer_by(replay2.meet().unwrap());
761                    for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
762                        let (val2, time2, diff2) = replay2.edit().unwrap();
763                        results(val1, val2, time1.join(time2), diff1, diff2);
764                    }
765                    replay2.step();
766                }
767            }
768
769            while !replay1.is_done() {
770                replay2.advance_buffer_by(replay1.meet().unwrap());
771                for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
772                    let (val1, time1, diff1) = replay1.edit().unwrap();
773                    results(val1, val2, time1.join(time2), diff1, diff2);
774                }
775                replay1.step();
776            }
777            while !replay2.is_done() {
778                replay1.advance_buffer_by(replay2.meet().unwrap());
779                for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
780                    let (val2, time2, diff2) = replay2.edit().unwrap();
781                    results(val1, val2, time1.join(time2), diff1, diff2);
782                }
783                replay2.step();
784            }
785        }
786    }
787}