Skip to main content

palimpsest_dataflow/operators/
join.rs

1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7
8use timely::container::PushInto;
9use timely::dataflow::channels::pact::Pipeline;
10use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session};
11use timely::dataflow::operators::Capability;
12use timely::dataflow::{Scope, StreamCore};
13use timely::order::PartialOrder;
14use timely::progress::Timestamp;
15use timely::{Accountable, ContainerBuilder};
16
17use crate::difference::{Abelian, Multiply, Semigroup};
18use crate::hashable::Hashable;
19use crate::lattice::Lattice;
20use crate::operators::arrange::{ArrangeByKey, ArrangeBySelf, Arranged};
21use crate::operators::ValueHistory;
22use crate::trace::{BatchReader, Cursor};
23use crate::{Data, ExchangeData, VecCollection};
24
25use crate::trace::TraceReader;
26
27/// Join implementations for `(key,val)` data.
28pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
29    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
30    ///
31    /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
32    ///
33    /// # Examples
34    ///
35    /// ```
36    /// use palimpsest_dataflow::input::Input;
37    /// use palimpsest_dataflow::operators::Join;
38    ///
39    /// ::timely::example(|scope| {
40    ///
41    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
42    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
43    ///     let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
44    ///
45    ///     x.join(&y)
46    ///      .assert_eq(&z);
47    /// });
48    /// ```
49    fn join<V2, R2>(
50        &self,
51        other: &VecCollection<G, (K, V2), R2>,
52    ) -> VecCollection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>
53    where
54        K: ExchangeData,
55        V2: ExchangeData,
56        R2: ExchangeData + Semigroup,
57        R: Multiply<R2, Output: Semigroup + 'static>,
58    {
59        self.join_map(other, |k, v, v2| (k.clone(), (v.clone(), v2.clone())))
60    }
61
62    /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
63    ///
64    /// # Examples
65    ///
66    /// ```
67    /// use palimpsest_dataflow::input::Input;
68    /// use palimpsest_dataflow::operators::Join;
69    ///
70    /// ::timely::example(|scope| {
71    ///
72    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
73    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
74    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
75    ///
76    ///     x.join_map(&y, |_key, &a, &b| (a,b))
77    ///      .assert_eq(&z);
78    /// });
79    /// ```
80    fn join_map<V2, R2, D, L>(
81        &self,
82        other: &VecCollection<G, (K, V2), R2>,
83        logic: L,
84    ) -> VecCollection<G, D, <R as Multiply<R2>>::Output>
85    where
86        K: ExchangeData,
87        V2: ExchangeData,
88        R2: ExchangeData + Semigroup,
89        R: Multiply<R2, Output: Semigroup + 'static>,
90        D: Data,
91        L: FnMut(&K, &V, &V2) -> D + 'static;
92
93    /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
94    ///
95    /// When the second collection contains frequencies that are either zero or one this is the more traditional
96    /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
97    /// the counts of the records in the first input.
98    ///
99    /// # Examples
100    ///
101    /// ```
102    /// use palimpsest_dataflow::input::Input;
103    /// use palimpsest_dataflow::operators::Join;
104    ///
105    /// ::timely::example(|scope| {
106    ///
107    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
108    ///     let y = scope.new_collection_from(vec![0, 2]).1;
109    ///     let z = scope.new_collection_from(vec![(0, 1)]).1;
110    ///
111    ///     x.semijoin(&y)
112    ///      .assert_eq(&z);
113    /// });
114    /// ```
115    fn semijoin<R2>(
116        &self,
117        other: &VecCollection<G, K, R2>,
118    ) -> VecCollection<G, (K, V), <R as Multiply<R2>>::Output>
119    where
120        K: ExchangeData,
121        R2: ExchangeData + Semigroup,
122        R: Multiply<R2, Output: Semigroup + 'static>;
123
124    /// Subtracts the semijoin with `other` from `self`.
125    ///
126    /// In the case that `other` has multiplicities zero or one this results
127    /// in a relational antijoin, in which we discard input records whose key
128    /// is present in `other`. If the multiplicities could be other than zero
129    /// or one, the semantic interpretation of this operator is less clear.
130    ///
131    /// In almost all cases, you should ensure that `other` has multiplicities
132    /// that are zero or one, perhaps by using the `distinct` operator.
133    ///
134    /// # Examples
135    ///
136    /// ```
137    /// use palimpsest_dataflow::input::Input;
138    /// use palimpsest_dataflow::operators::Join;
139    ///
140    /// ::timely::example(|scope| {
141    ///
142    ///     let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
143    ///     let y = scope.new_collection_from(vec![0, 2]).1;
144    ///     let z = scope.new_collection_from(vec![(1, 3)]).1;
145    ///
146    ///     x.antijoin(&y)
147    ///      .assert_eq(&z);
148    /// });
149    /// ```
150    fn antijoin<R2>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), R>
151    where
152        K: ExchangeData,
153        R2: ExchangeData + Semigroup,
154        R: Multiply<R2, Output = R>,
155        R: Abelian + 'static;
156}
157
158impl<G, K, V, R> Join<G, K, V, R> for VecCollection<G, (K, V), R>
159where
160    G: Scope<Timestamp: Lattice + Ord>,
161    K: ExchangeData + Hashable,
162    V: ExchangeData,
163    R: ExchangeData + Semigroup,
164{
165    fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
166        &self,
167        other: &VecCollection<G, (K, V2), R2>,
168        mut logic: L,
169    ) -> VecCollection<G, D, <R as Multiply<R2>>::Output>
170    where
171        R: Multiply<R2, Output: Semigroup + 'static>,
172        L: FnMut(&K, &V, &V2) -> D + 'static,
173    {
174        let arranged1 = self.arrange_by_key();
175        let arranged2 = other.arrange_by_key();
176        arranged1.join_core(&arranged2, move |k, v1, v2| Some(logic(k, v1, v2)))
177    }
178
179    fn semijoin<R2: ExchangeData + Semigroup>(
180        &self,
181        other: &VecCollection<G, K, R2>,
182    ) -> VecCollection<G, (K, V), <R as Multiply<R2>>::Output>
183    where
184        R: Multiply<R2, Output: Semigroup + 'static>,
185    {
186        let arranged1 = self.arrange_by_key();
187        let arranged2 = other.arrange_by_self();
188        arranged1.join_core(&arranged2, |k, v, _| Some((k.clone(), v.clone())))
189    }
190
191    fn antijoin<R2: ExchangeData + Semigroup>(
192        &self,
193        other: &VecCollection<G, K, R2>,
194    ) -> VecCollection<G, (K, V), R>
195    where
196        R: Multiply<R2, Output = R>,
197        R: Abelian + 'static,
198    {
199        self.concat(&self.semijoin(other).negate())
200    }
201}
202
203impl<G, K, V, Tr> Join<G, K, V, Tr::Diff> for Arranged<G, Tr>
204where
205    G: Scope<Timestamp = Tr::Time>,
206    Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static,
207    K: ExchangeData + Hashable,
208    V: Data + 'static,
209{
210    fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
211        &self,
212        other: &VecCollection<G, (K, V2), R2>,
213        mut logic: L,
214    ) -> VecCollection<G, D, <Tr::Diff as Multiply<R2>>::Output>
215    where
216        Tr::Diff: Multiply<R2, Output: Semigroup + 'static>,
217        L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2) -> D + 'static,
218    {
219        let arranged2 = other.arrange_by_key();
220        self.join_core(&arranged2, move |k, v1, v2| Some(logic(k, v1, v2)))
221    }
222
223    fn semijoin<R2: ExchangeData + Semigroup>(
224        &self,
225        other: &VecCollection<G, K, R2>,
226    ) -> VecCollection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
227    where
228        Tr::Diff: Multiply<R2, Output: Semigroup + 'static>,
229    {
230        let arranged2 = other.arrange_by_self();
231        self.join_core(&arranged2, |k, v, _| Some((k.clone(), v.clone())))
232    }
233
234    fn antijoin<R2: ExchangeData + Semigroup>(
235        &self,
236        other: &VecCollection<G, K, R2>,
237    ) -> VecCollection<G, (K, V), Tr::Diff>
238    where
239        Tr::Diff: Multiply<R2, Output = Tr::Diff>,
240        Tr::Diff: Abelian + 'static,
241    {
242        self.as_collection(|k, v| (k.clone(), v.clone()))
243            .concat(&self.semijoin(other).negate())
244    }
245}
246
247/// Matches the elements of two arranged traces.
248///
249/// This method is used by the various `join` implementations, but it can also be used
250/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
251/// the arrangement is available for re-use, or from the output of a `reduce` operator.
252pub trait JoinCore<
253    G: Scope<Timestamp: Lattice + Ord>,
254    K: 'static + ?Sized,
255    V: 'static + ?Sized,
256    R: Semigroup,
257>
258{
259    /// Joins two arranged collections with the same key type.
260    ///
261    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
262    /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
263    /// every value returned by the iterator.
264    ///
265    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
266    /// contains the implementations for collections.
267    ///
268    /// # Examples
269    ///
270    /// ```
271    /// use palimpsest_dataflow::input::Input;
272    /// use palimpsest_dataflow::operators::arrange::ArrangeByKey;
273    /// use palimpsest_dataflow::operators::join::JoinCore;
274    /// use palimpsest_dataflow::trace::Trace;
275    ///
276    /// ::timely::example(|scope| {
277    ///
278    ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
279    ///                  .arrange_by_key();
280    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
281    ///                  .arrange_by_key();
282    ///
283    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
284    ///
285    ///     x.join_core(&y, |_key, &a, &b| Some((a, b)))
286    ///      .assert_eq(&z);
287    /// });
288    /// ```
289    fn join_core<Tr2, I, L>(
290        &self,
291        stream2: &Arranged<G, Tr2>,
292        result: L,
293    ) -> VecCollection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
294    where
295        Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
296        R: Multiply<Tr2::Diff, Output: Semigroup + 'static>,
297        I: IntoIterator<Item: Data>,
298        L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static;
299
300    /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
301    /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
302    /// flexibility, but is more error-prone.
303    ///
304    /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
305    /// which produces something implementing `IntoIterator`, where the output collection will have an entry
306    /// for every value returned by the iterator.
307    ///
308    /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
309    /// contains the implementations for collections.
310    ///
311    /// # Examples
312    ///
313    /// ```
314    /// use palimpsest_dataflow::input::Input;
315    /// use palimpsest_dataflow::operators::arrange::ArrangeByKey;
316    /// use palimpsest_dataflow::operators::join::JoinCore;
317    /// use palimpsest_dataflow::trace::Trace;
318    ///
319    /// ::timely::example(|scope| {
320    ///
321    ///     let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
322    ///                  .arrange_by_key();
323    ///     let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
324    ///                  .arrange_by_key();
325    ///
326    ///     let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
327    ///
328    ///     // Returned values have weight `a`
329    ///     x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
330    ///      .assert_eq(&z);
331    /// });
332    /// ```
333    fn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
334        &self,
335        stream2: &Arranged<G, Tr2>,
336        result: L,
337    ) -> VecCollection<G, D, ROut>
338    where
339        Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
340        D: Data,
341        ROut: Semigroup + 'static,
342        I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
343        L: for<'a> FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static;
344}
345
346impl<G, K, V, R> JoinCore<G, K, V, R> for VecCollection<G, (K, V), R>
347where
348    G: Scope<Timestamp: Lattice + Ord>,
349    K: ExchangeData + Hashable,
350    V: ExchangeData,
351    R: ExchangeData + Semigroup,
352{
353    fn join_core<Tr2, I, L>(
354        &self,
355        stream2: &Arranged<G, Tr2>,
356        result: L,
357    ) -> VecCollection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
358    where
359        Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
360        R: Multiply<Tr2::Diff, Output: Semigroup + 'static>,
361        I: IntoIterator<Item: Data>,
362        L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static,
363    {
364        self.arrange_by_key().join_core(stream2, result)
365    }
366
367    fn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
368        &self,
369        stream2: &Arranged<G, Tr2>,
370        result: L,
371    ) -> VecCollection<G, D, ROut>
372    where
373        Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
374        I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
375        L: FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static,
376        D: Data,
377        ROut: Semigroup + 'static,
378    {
379        self.arrange_by_key()
380            .join_core_internal_unsafe(stream2, result)
381    }
382}
383
384/// The session passed to join closures.
385pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder<CB>, CT>;
386
387/// A container builder that tracks the length of outputs to estimate the effort of join closures.
388#[derive(Default, Debug)]
389pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
390
391impl<CB: ContainerBuilder> timely::container::ContainerBuilder for EffortBuilder<CB> {
392    type Container = CB::Container;
393
394    #[inline]
395    fn extract(&mut self) -> Option<&mut Self::Container> {
396        let extracted = self.1.extract();
397        self.0
398            .replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.record_count() as usize));
399        extracted
400    }
401
402    #[inline]
403    fn finish(&mut self) -> Option<&mut Self::Container> {
404        let finished = self.1.finish();
405        self.0
406            .replace(self.0.take() + finished.as_ref().map_or(0, |e| e.record_count() as usize));
407        finished
408    }
409}
410
411impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
412    #[inline]
413    fn push_into(&mut self, item: D) {
414        self.1.push_into(item);
415    }
416}
417
418/// An equijoin of two traces, sharing a common key type.
419///
420/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
421/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
422/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
423/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
424///
425/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the
426/// output stream in a collection.
427///
428/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
429///
430/// [`AsCollection`]: crate::collection::AsCollection
431pub fn join_traces<G, T1, T2, L, CB>(
432    arranged1: &Arranged<G, T1>,
433    arranged2: &Arranged<G, T2>,
434    mut result: L,
435) -> StreamCore<G, CB::Container>
436where
437    G: Scope<Timestamp = T1::Time>,
438    T1: TraceReader + Clone + 'static,
439    T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
440    L: FnMut(
441            T1::Key<'_>,
442            T1::Val<'_>,
443            T2::Val<'_>,
444            &G::Timestamp,
445            &T1::Diff,
446            &T2::Diff,
447            &mut JoinSession<T1::Time, CB, Capability<T1::Time>>,
448        ) + 'static,
449    CB: ContainerBuilder,
450{
451    // Rename traces for symmetry from here on out.
452    let mut trace1 = arranged1.trace.clone();
453    let mut trace2 = arranged2.trace.clone();
454
455    arranged1.stream.binary_frontier(
456        &arranged2.stream,
457        Pipeline,
458        Pipeline,
459        "Join",
460        move |capability, info| {
461            // Acquire an activator to reschedule the operator when it has unfinished work.
462            use timely::scheduling::Activator;
463            let activations = arranged1.stream.scope().activations().clone();
464            let activator = Activator::new(info.address, activations);
465
466            // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
467            // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
468            // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
469            // initial work for the two traces, and before the operator is constructed.
470
471            // Acknowledged frontier for each input.
472            // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
473            // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
474            // the physical compaction frontier of their corresponding trace.
475            // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
476            use timely::progress::frontier::Antichain;
477            let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
478            let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
479
480            // deferred work of batches from each input.
481            let mut todo1 = std::collections::VecDeque::new();
482            let mut todo2 = std::collections::VecDeque::new();
483
484            // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
485            trace1.map_batches(|batch1| {
486                acknowledged1.clone_from(batch1.upper());
487                // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
488                // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
489                // Once we start streaming batches in, we will need to respond to new batches from
490                // `input1` with logic that would have otherwise been here. Check out the next loop
491                // for the structure.
492            });
493            // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
494            // iterating through batches and capturing the upper bound. This is a great moment to assert that
495            // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
496            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
497            assert!(PartialOrder::less_equal(
498                &trace1.get_physical_compaction(),
499                &acknowledged1.borrow()
500            ));
501
502            // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
503            // on both traces at the same time, as they could be the same trace and this would panic.
504            let mut batch2_cursors = Vec::new();
505            trace2.map_batches(|batch2| {
506                acknowledged2.clone_from(batch2.upper());
507                batch2_cursors.push((batch2.cursor(), batch2.clone()));
508            });
509            // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
510            // iterating through batches and capturing the upper bound. This is a great moment to assert that
511            // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
512            // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
513            assert!(PartialOrder::less_equal(
514                &trace2.get_physical_compaction(),
515                &acknowledged2.borrow()
516            ));
517
518            // Load up deferred work using trace2 cursors and batches captured just above.
519            for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
520                // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
521                let (trace1_cursor, trace1_storage) =
522                    trace1.cursor_through(acknowledged1.borrow()).unwrap();
523                // We could downgrade the capability here, but doing so is a bit complicated mathematically.
524                // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
525                // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
526                // that property.
527                todo2.push_back(Deferred::new(
528                    trace1_cursor,
529                    trace1_storage,
530                    batch2_cursor,
531                    batch2.clone(),
532                    capability.clone(),
533                ));
534            }
535
536            // Droppable handles to shared trace data structures.
537            let mut trace1_option = Some(trace1);
538            let mut trace2_option = Some(trace2);
539
540            move |(input1, frontier1), (input2, frontier2), output| {
541                // 1. Consuming input.
542                //
543                // The join computation repeatedly accepts batches of updates from each of its inputs.
544                //
545                // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
546                // updates from its other input. It is important to track which updates have been accepted, because
547                // we use a shared trace and there may be updates present that are in advance of this accepted bound.
548                //
549                // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
550                // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
551                // This last case is a consequence of our inability to transmit empty batches, as they may be formed
552                // in the absence of timely dataflow capabilities.
553
554                // Drain input 1, prepare work.
555                input1.for_each(|capability, data| {
556                    // This test *should* always pass, as we only drop a trace in response to the other input emptying.
557                    if let Some(ref mut trace2) = trace2_option {
558                        let capability = capability.retain();
559                        for batch1 in data.drain(..) {
560                            // Ignore any pre-loaded data.
561                            if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
562                                if !batch1.is_empty() {
563                                    // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
564                                    // at start-up, and have held back physical compaction ever since.
565                                    let (trace2_cursor, trace2_storage) =
566                                        trace2.cursor_through(acknowledged2.borrow()).unwrap();
567                                    let batch1_cursor = batch1.cursor();
568                                    todo1.push_back(Deferred::new(
569                                        trace2_cursor,
570                                        trace2_storage,
571                                        batch1_cursor,
572                                        batch1.clone(),
573                                        capability.clone(),
574                                    ));
575                                }
576
577                                // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
578                                // may have skipped over empty batches. Still, the batches are in-order, and we should be
579                                // able to just assume the most recent `batch1.upper`
580                                debug_assert!(PartialOrder::less_equal(
581                                    &acknowledged1,
582                                    batch1.upper()
583                                ));
584                                acknowledged1.clone_from(batch1.upper());
585                            }
586                        }
587                    } else {
588                        panic!("`trace2_option` dropped before `input1` emptied!");
589                    }
590                });
591
592                // Drain input 2, prepare work.
593                input2.for_each(|capability, data| {
594                    // This test *should* always pass, as we only drop a trace in response to the other input emptying.
595                    if let Some(ref mut trace1) = trace1_option {
596                        let capability = capability.retain();
597                        for batch2 in data.drain(..) {
598                            // Ignore any pre-loaded data.
599                            if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
600                                if !batch2.is_empty() {
601                                    // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
602                                    // at start-up, and have held back physical compaction ever since.
603                                    let (trace1_cursor, trace1_storage) =
604                                        trace1.cursor_through(acknowledged1.borrow()).unwrap();
605                                    let batch2_cursor = batch2.cursor();
606                                    todo2.push_back(Deferred::new(
607                                        trace1_cursor,
608                                        trace1_storage,
609                                        batch2_cursor,
610                                        batch2.clone(),
611                                        capability.clone(),
612                                    ));
613                                }
614
615                                // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
616                                // may have skipped over empty batches. Still, the batches are in-order, and we should be
617                                // able to just assume the most recent `batch2.upper`
618                                debug_assert!(PartialOrder::less_equal(
619                                    &acknowledged2,
620                                    batch2.upper()
621                                ));
622                                acknowledged2.clone_from(batch2.upper());
623                            }
624                        }
625                    } else {
626                        panic!("`trace1_option` dropped before `input2` emptied!");
627                    }
628                });
629
630                // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
631                if let Some(trace1) = trace1_option.as_mut() {
632                    trace1.advance_upper(&mut acknowledged1);
633                }
634                if let Some(trace2) = trace2_option.as_mut() {
635                    trace2.advance_upper(&mut acknowledged2);
636                }
637
638                // 2. Join computation.
639                //
640                // For each of the inputs, we do some amount of work (measured in terms of number
641                // of output records produced). This is meant to yield control to allow downstream
642                // operators to consume and reduce the output, but it it also means to provide some
643                // degree of responsiveness. There is a potential risk here that if we fall behind
644                // then the increasing queues hold back physical compaction of the underlying traces
645                // which results in unintentionally quadratic processing time (each batch of either
646                // input must scan all batches from the other input).
647
648                // Perform some amount of outstanding work.
649                let mut fuel = 1_000_000;
650                while !todo1.is_empty() && fuel > 0 {
651                    todo1.front_mut().unwrap().work(
652                        output,
653                        |k, v2, v1, t, r2, r1, c| result(k, v1, v2, t, r1, r2, c),
654                        &mut fuel,
655                    );
656                    if !todo1.front().unwrap().work_remains() {
657                        todo1.pop_front();
658                    }
659                }
660
661                // Perform some amount of outstanding work.
662                let mut fuel = 1_000_000;
663                while !todo2.is_empty() && fuel > 0 {
664                    todo2.front_mut().unwrap().work(
665                        output,
666                        |k, v1, v2, t, r1, r2, c| result(k, v1, v2, t, r1, r2, c),
667                        &mut fuel,
668                    );
669                    if !todo2.front().unwrap().work_remains() {
670                        todo2.pop_front();
671                    }
672                }
673
674                // Re-activate operator if work remains.
675                if !todo1.is_empty() || !todo2.is_empty() {
676                    activator.activate();
677                }
678
679                // 3. Trace maintenance.
680                //
681                // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
682                // the progress of an input, because should we ever drop one of the traces we will
683                // lose the ability to extract information from anything other than the input.
684                // For example, if we dropped `trace2` we would not be able to use `advance_upper`
685                // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
686                // compaction of `trace1`.
687
688                // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
689                if let Some(trace1) = trace1_option.as_mut() {
690                    if frontier2.is_empty() {
691                        trace1_option = None;
692                    } else {
693                        // Allow `trace1` to compact logically up to the frontier we may yet receive,
694                        // in the opposing input (`input2`). All `input2` times will be beyond this
695                        // frontier, and joined times only need to be accurate when advanced to it.
696                        trace1.set_logical_compaction(frontier2.frontier());
697                        // Allow `trace1` to compact physically up to the upper bound of batches we
698                        // have received in its input (`input1`). We will not require a cursor that
699                        // is not beyond this bound.
700                        trace1.set_physical_compaction(acknowledged1.borrow());
701                    }
702                }
703
704                // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
705                if let Some(trace2) = trace2_option.as_mut() {
706                    if frontier1.is_empty() {
707                        trace2_option = None;
708                    } else {
709                        // Allow `trace2` to compact logically up to the frontier we may yet receive,
710                        // in the opposing input (`input1`). All `input1` times will be beyond this
711                        // frontier, and joined times only need to be accurate when advanced to it.
712                        trace2.set_logical_compaction(frontier1.frontier());
713                        // Allow `trace2` to compact physically up to the upper bound of batches we
714                        // have received in its input (`input2`). We will not require a cursor that
715                        // is not beyond this bound.
716                        trace2.set_physical_compaction(acknowledged2.borrow());
717                    }
718                }
719            }
720        },
721    )
722}
723
724/// Deferred join computation.
725///
726/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
727/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
728/// dataflow system a chance to run operators that can consume and aggregate the data.
729struct Deferred<T, C1, C2>
730where
731    T: Timestamp + Lattice + Ord,
732    C1: Cursor<Time = T>,
733    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T>,
734{
735    trace: C1,
736    trace_storage: C1::Storage,
737    batch: C2,
738    batch_storage: C2::Storage,
739    capability: Capability<T>,
740    done: bool,
741}
742
743impl<T, C1, C2> Deferred<T, C1, C2>
744where
745    C1: Cursor<Time = T>,
746    C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T>,
747    T: Timestamp + Lattice + Ord,
748{
749    fn new(
750        trace: C1,
751        trace_storage: C1::Storage,
752        batch: C2,
753        batch_storage: C2::Storage,
754        capability: Capability<T>,
755    ) -> Self {
756        Deferred {
757            trace,
758            trace_storage,
759            batch,
760            batch_storage,
761            capability,
762            done: false,
763        }
764    }
765
766    fn work_remains(&self) -> bool {
767        !self.done
768    }
769
770    /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
771    #[inline(never)]
772    fn work<L, CB: ContainerBuilder>(
773        &mut self,
774        output: &mut OutputBuilderSession<T, EffortBuilder<CB>>,
775        mut logic: L,
776        fuel: &mut usize,
777    ) where
778        L: for<'a> FnMut(
779            C1::Key<'a>,
780            C1::Val<'a>,
781            C2::Val<'a>,
782            &T,
783            &C1::Diff,
784            &C2::Diff,
785            &mut JoinSession<T, CB, Capability<T>>,
786        ),
787    {
788        let meet = self.capability.time();
789
790        let mut effort = 0;
791        let mut session = output.session_with_builder(&self.capability);
792
793        let trace_storage = &self.trace_storage;
794        let batch_storage = &self.batch_storage;
795
796        let trace = &mut self.trace;
797        let batch = &mut self.batch;
798
799        let mut thinker = JoinThinker::new();
800
801        while let (Some(batch_key), Some(trace_key), true) = (
802            batch.get_key(batch_storage),
803            trace.get_key(trace_storage),
804            effort < *fuel,
805        ) {
806            match trace_key.cmp(&batch_key) {
807                Ordering::Less => trace.seek_key(trace_storage, batch_key),
808                Ordering::Greater => batch.seek_key(batch_storage, trace_key),
809                Ordering::Equal => {
810                    thinker.history1.edits.load(trace, trace_storage, |time| {
811                        let mut time = C1::owned_time(time);
812                        time.join_assign(meet);
813                        time
814                    });
815                    thinker
816                        .history2
817                        .edits
818                        .load(batch, batch_storage, |time| C2::owned_time(time));
819
820                    // populate `temp` with the results in the best way we know how.
821                    thinker.think(|v1, v2, t, r1, r2| {
822                        logic(batch_key, v1, v2, &t, r1, r2, &mut session);
823                    });
824
825                    // TODO: Effort isn't perfectly tracked as we might still have some data in the
826                    // session at the moment it's dropped.
827                    effort += session.builder().0.take();
828                    batch.step_key(batch_storage);
829                    trace.step_key(trace_storage);
830
831                    thinker.history1.clear();
832                    thinker.history2.clear();
833                }
834            }
835        }
836        self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
837
838        if effort > *fuel {
839            *fuel = 0;
840        } else {
841            *fuel -= effort;
842        }
843    }
844}
845
846struct JoinThinker<'a, C1, C2>
847where
848    C1: Cursor,
849    C2: Cursor<Time = C1::Time>,
850{
851    pub history1: ValueHistory<'a, C1>,
852    pub history2: ValueHistory<'a, C2>,
853}
854
855impl<'a, C1, C2> JoinThinker<'a, C1, C2>
856where
857    C1: Cursor,
858    C2: Cursor<Time = C1::Time>,
859{
860    fn new() -> Self {
861        JoinThinker {
862            history1: ValueHistory::new(),
863            history2: ValueHistory::new(),
864        }
865    }
866
867    fn think<F: FnMut(C1::Val<'a>, C2::Val<'a>, C1::Time, &C1::Diff, &C2::Diff)>(
868        &mut self,
869        mut results: F,
870    ) {
871        // for reasonably sized edits, do the dead-simple thing.
872        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
873            self.history1.edits.map(|v1, t1, d1| {
874                self.history2.edits.map(|v2, t2, d2| {
875                    results(v1, v2, t1.join(t2), d1, d2);
876                })
877            })
878        } else {
879            let mut replay1 = self.history1.replay();
880            let mut replay2 = self.history2.replay();
881
882            // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
883            //       in here. If a time is ever repeated, for example, the call will be identical
884            //       and accomplish nothing. If only a single record has been added, it may not
885            //       be worth the time to collapse (advance, re-sort) the data when a linear scan
886            //       is sufficient.
887
888            while !replay1.is_done() && !replay2.is_done() {
889                if replay1.time().unwrap().cmp(replay2.time().unwrap())
890                    == ::std::cmp::Ordering::Less
891                {
892                    replay2.advance_buffer_by(replay1.meet().unwrap());
893                    for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
894                        let (val1, time1, diff1) = replay1.edit().unwrap();
895                        results(val1, val2, time1.join(time2), diff1, diff2);
896                    }
897                    replay1.step();
898                } else {
899                    replay1.advance_buffer_by(replay2.meet().unwrap());
900                    for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
901                        let (val2, time2, diff2) = replay2.edit().unwrap();
902                        results(val1, val2, time1.join(time2), diff1, diff2);
903                    }
904                    replay2.step();
905                }
906            }
907
908            while !replay1.is_done() {
909                replay2.advance_buffer_by(replay1.meet().unwrap());
910                for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
911                    let (val1, time1, diff1) = replay1.edit().unwrap();
912                    results(val1, val2, time1.join(time2), diff1, diff2);
913                }
914                replay1.step();
915            }
916            while !replay2.is_done() {
917                replay1.advance_buffer_by(replay2.meet().unwrap());
918                for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
919                    let (val2, time2, diff2) = replay2.edit().unwrap();
920                    results(val1, val2, time1.join(time2), diff1, diff2);
921                }
922                replay2.step();
923            }
924        }
925    }
926}