Skip to main content

differential_dataflow/operators/
join.rs

1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7
8use timely::{Accountable, ContainerBuilder};
9use timely::container::PushInto;
10use timely::order::PartialOrder;
11use timely::progress::Timestamp;
12use timely::dataflow::Stream;
13use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::operators::Capability;
16
17use crate::lattice::Lattice;
18use crate::operators::arrange::Arranged;
19use crate::trace::{BatchReader, Cursor};
20use crate::operators::ValueHistory;
21
22use crate::trace::TraceReader;
23
24/// The session passed to join closures.
25pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder<CB>, CT>;
26
27/// A container builder that tracks the length of outputs to estimate the effort of join closures.
28#[derive(Default, Debug)]
29pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
30
31impl<CB: ContainerBuilder> timely::container::ContainerBuilder for EffortBuilder<CB> {
32    type Container = CB::Container;
33
34    #[inline]
35    fn extract(&mut self) -> Option<&mut Self::Container> {
36        let extracted = self.1.extract();
37        self.0.replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.record_count() as usize));
38        extracted
39    }
40
41    #[inline]
42    fn finish(&mut self) -> Option<&mut Self::Container> {
43        let finished = self.1.finish();
44        self.0.replace(self.0.take() + finished.as_ref().map_or(0, |e| e.record_count() as usize));
45        finished
46    }
47}
48
49impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
50    #[inline]
51    fn push_into(&mut self, item: D) {
52        self.1.push_into(item);
53    }
54}
55
56/// An equijoin of two traces, sharing a common key type.
57///
58/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
59/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
60/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
61/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
62///
63/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the
64/// output stream in a collection.
65///
66/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
67///
68/// [`AsCollection`]: crate::collection::AsCollection
69pub fn join_traces<'scope, Tr1, Tr2, L, CB>(arranged1: Arranged<'scope, Tr1>, arranged2: Arranged<'scope, Tr2>, mut result: L) -> Stream<'scope, Tr1::Time, CB::Container>
70where
71    Tr1: TraceReader+Clone+'static,
72    Tr2: for<'a> TraceReader<Key<'a>=Tr1::Key<'a>, Time = Tr1::Time>+Clone+'static,
73    L: FnMut(Tr1::Key<'_>,Tr1::Val<'_>,Tr2::Val<'_>,&Tr1::Time,&Tr1::Diff,&Tr2::Diff,&mut JoinSession<Tr1::Time, CB, Capability<Tr1::Time>>)+'static,
74    CB: ContainerBuilder,
75{
76    // Rename traces for symmetry from here on out.
77    let mut trace1 = arranged1.trace.clone();
78    let mut trace2 = arranged2.trace.clone();
79
80    let scope = arranged1.stream.scope();
81    arranged1.stream.binary_frontier(arranged2.stream, Pipeline, Pipeline, "Join", move |capability, info| {
82
83        // Acquire an activator to reschedule the operator when it has unfinished work.
84        use timely::scheduling::Activator;
85        let activations = scope.activations().clone();
86        let activator = Activator::new(info.address, activations);
87
88        // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
89        // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
90        // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
91        // initial work for the two traces, and before the operator is constructed.
92
93        // Acknowledged frontier for each input.
94        // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
95        // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
96        // the physical compaction frontier of their corresponding trace.
97        // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
98        use timely::progress::frontier::Antichain;
99        let mut acknowledged1 = Antichain::from_elem(Tr1::Time::minimum());
100        let mut acknowledged2 = Antichain::from_elem(Tr1::Time::minimum());
101
102        // deferred work of batches from each input.
103        let mut todo1 = std::collections::VecDeque::new();
104        let mut todo2 = std::collections::VecDeque::new();
105
106        // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
107        trace1.map_batches(|batch1| {
108            acknowledged1.clone_from(batch1.upper());
109            // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
110            // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
111            // Once we start streaming batches in, we will need to respond to new batches from
112            // `input1` with logic that would have otherwise been here. Check out the next loop
113            // for the structure.
114        });
115        // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
116        // iterating through batches and capturing the upper bound. This is a great moment to assert that
117        // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
118        // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
119        assert!(PartialOrder::less_equal(&trace1.get_physical_compaction(), &acknowledged1.borrow()));
120
121        // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
122        // on both traces at the same time, as they could be the same trace and this would panic.
123        let mut batch2_cursors = Vec::new();
124        trace2.map_batches(|batch2| {
125            acknowledged2.clone_from(batch2.upper());
126            batch2_cursors.push((batch2.cursor(), batch2.clone()));
127        });
128        // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
129        // iterating through batches and capturing the upper bound. This is a great moment to assert that
130        // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
131        // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
132        assert!(PartialOrder::less_equal(&trace2.get_physical_compaction(), &acknowledged2.borrow()));
133
134        // Load up deferred work using trace2 cursors and batches captured just above.
135        for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
136            // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
137            let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
138            // We could downgrade the capability here, but doing so is a bit complicated mathematically.
139            // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
140            // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
141            // that property.
142            todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
143        }
144
145        // Droppable handles to shared trace data structures.
146        let mut trace1_option = Some(trace1);
147        let mut trace2_option = Some(trace2);
148
149        move |(input1, frontier1), (input2, frontier2), output| {
150
151            // 1. Consuming input.
152            //
153            // The join computation repeatedly accepts batches of updates from each of its inputs.
154            //
155            // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
156            // updates from its other input. It is important to track which updates have been accepted, because
157            // we use a shared trace and there may be updates present that are in advance of this accepted bound.
158            //
159            // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
160            // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
161            // This last case is a consequence of our inability to transmit empty batches, as they may be formed
162            // in the absence of timely dataflow capabilities.
163
164            // Drain input 1, prepare work.
165            input1.for_each(|capability, data| {
166                // This test *should* always pass, as we only drop a trace in response to the other input emptying.
167                if let Some(ref mut trace2) = trace2_option {
168                    let capability = capability.retain(0);
169                    for batch1 in data.drain(..) {
170                        // Ignore any pre-loaded data.
171                        if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
172                            if !batch1.is_empty() {
173                                // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
174                                // at start-up, and have held back physical compaction ever since.
175                                let (trace2_cursor, trace2_storage) = trace2.cursor_through(acknowledged2.borrow()).unwrap();
176                                let batch1_cursor = batch1.cursor();
177                                todo1.push_back(Deferred::new(trace2_cursor, trace2_storage, batch1_cursor, batch1.clone(), capability.clone()));
178                            }
179
180                            // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
181                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
182                            // able to just assume the most recent `batch1.upper`
183                            debug_assert!(PartialOrder::less_equal(&acknowledged1, batch1.upper()));
184                            acknowledged1.clone_from(batch1.upper());
185                        }
186                    }
187                }
188                else { panic!("`trace2_option` dropped before `input1` emptied!"); }
189            });
190
191            // Drain input 2, prepare work.
192            input2.for_each(|capability, data| {
193                // This test *should* always pass, as we only drop a trace in response to the other input emptying.
194                if let Some(ref mut trace1) = trace1_option {
195                    let capability = capability.retain(0);
196                    for batch2 in data.drain(..) {
197                        // Ignore any pre-loaded data.
198                        if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
199                            if !batch2.is_empty() {
200                                // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
201                                // at start-up, and have held back physical compaction ever since.
202                                let (trace1_cursor, trace1_storage) = trace1.cursor_through(acknowledged1.borrow()).unwrap();
203                                let batch2_cursor = batch2.cursor();
204                                todo2.push_back(Deferred::new(trace1_cursor, trace1_storage, batch2_cursor, batch2.clone(), capability.clone()));
205                            }
206
207                            // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
208                            // may have skipped over empty batches. Still, the batches are in-order, and we should be
209                            // able to just assume the most recent `batch2.upper`
210                            debug_assert!(PartialOrder::less_equal(&acknowledged2, batch2.upper()));
211                            acknowledged2.clone_from(batch2.upper());
212                        }
213                    }
214                }
215                else { panic!("`trace1_option` dropped before `input2` emptied!"); }
216            });
217
218            // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
219            if let Some(trace1) = trace1_option.as_mut() {
220                trace1.advance_upper(&mut acknowledged1);
221            }
222            if let Some(trace2) = trace2_option.as_mut() {
223                trace2.advance_upper(&mut acknowledged2);
224            }
225
226            // 2. Join computation.
227            //
228            // For each of the inputs, we do some amount of work (measured in terms of number
229            // of output records produced). This is meant to yield control to allow downstream
230            // operators to consume and reduce the output, but it it also means to provide some
231            // degree of responsiveness. There is a potential risk here that if we fall behind
232            // then the increasing queues hold back physical compaction of the underlying traces
233            // which results in unintentionally quadratic processing time (each batch of either
234            // input must scan all batches from the other input).
235
236            // Perform some amount of outstanding work.
237            let mut fuel = 1_000_000;
238            while !todo1.is_empty() && fuel > 0 {
239                todo1.front_mut().unwrap().work(
240                    output,
241                    |k,v2,v1,t,r2,r1,c| result(k,v1,v2,t,r1,r2,c),
242                    &mut fuel
243                );
244                if !todo1.front().unwrap().work_remains() { todo1.pop_front(); }
245            }
246
247            // Perform some amount of outstanding work.
248            let mut fuel = 1_000_000;
249            while !todo2.is_empty() && fuel > 0 {
250                todo2.front_mut().unwrap().work(
251                    output,
252                    |k,v1,v2,t,r1,r2,c| result(k,v1,v2,t,r1,r2,c),
253                    &mut fuel
254                );
255                if !todo2.front().unwrap().work_remains() { todo2.pop_front(); }
256            }
257
258            // Re-activate operator if work remains.
259            if !todo1.is_empty() || !todo2.is_empty() {
260                activator.activate();
261            }
262
263            // 3. Trace maintenance.
264            //
265            // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
266            // the progress of an input, because should we ever drop one of the traces we will
267            // lose the ability to extract information from anything other than the input.
268            // For example, if we dropped `trace2` we would not be able to use `advance_upper`
269            // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
270            // compaction of `trace1`.
271
272            // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
273            if let Some(trace1) = trace1_option.as_mut() {
274                if frontier2.is_empty() { trace1_option = None; }
275                else {
276                    // Allow `trace1` to compact logically up to the frontier we may yet receive,
277                    // in the opposing input (`input2`). All `input2` times will be beyond this
278                    // frontier, and joined times only need to be accurate when advanced to it.
279                    trace1.set_logical_compaction(frontier2.frontier());
280                    // Allow `trace1` to compact physically up to the upper bound of batches we
281                    // have received in its input (`input1`). We will not require a cursor that
282                    // is not beyond this bound.
283                    trace1.set_physical_compaction(acknowledged1.borrow());
284                }
285            }
286
287            // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
288            if let Some(trace2) = trace2_option.as_mut() {
289                if frontier1.is_empty() { trace2_option = None;}
290                else {
291                    // Allow `trace2` to compact logically up to the frontier we may yet receive,
292                    // in the opposing input (`input1`). All `input1` times will be beyond this
293                    // frontier, and joined times only need to be accurate when advanced to it.
294                    trace2.set_logical_compaction(frontier1.frontier());
295                    // Allow `trace2` to compact physically up to the upper bound of batches we
296                    // have received in its input (`input2`). We will not require a cursor that
297                    // is not beyond this bound.
298                    trace2.set_physical_compaction(acknowledged2.borrow());
299                }
300            }
301        }
302    })
303}
304
305
306/// Deferred join computation.
307///
308/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
309/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
310/// dataflow system a chance to run operators that can consume and aggregate the data.
311struct Deferred<T, C1, C2>
312where
313    T: Timestamp+Lattice+Ord,
314    C1: Cursor<Time=T>,
315    C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
316{
317    trace: C1,
318    trace_storage: C1::Storage,
319    batch: C2,
320    batch_storage: C2::Storage,
321    capability: Capability<T>,
322    done: bool,
323}
324
325impl<T, C1, C2> Deferred<T, C1, C2>
326where
327    C1: Cursor<Time=T>,
328    C2: for<'a> Cursor<Key<'a>=C1::Key<'a>, Time=T>,
329    T: Timestamp+Lattice+Ord,
330{
331    fn new(trace: C1, trace_storage: C1::Storage, batch: C2, batch_storage: C2::Storage, capability: Capability<T>) -> Self {
332        Deferred {
333            trace,
334            trace_storage,
335            batch,
336            batch_storage,
337            capability,
338            done: false,
339        }
340    }
341
342    fn work_remains(&self) -> bool {
343        !self.done
344    }
345
346    /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
347    #[inline(never)]
348    fn work<L, CB: ContainerBuilder>(&mut self, output: &mut OutputBuilderSession<T, EffortBuilder<CB>>, mut logic: L, fuel: &mut usize)
349    where
350        L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff, &mut JoinSession<T, CB, Capability<T>>),
351    {
352
353        let meet = self.capability.time();
354
355        let mut effort = 0;
356        let mut session = output.session_with_builder(&self.capability);
357
358        let trace_storage = &self.trace_storage;
359        let batch_storage = &self.batch_storage;
360
361        let trace = &mut self.trace;
362        let batch = &mut self.batch;
363
364        let mut thinker = JoinThinker::new();
365
366        while let (Some(batch_key), Some(trace_key), true) = (batch.get_key(batch_storage), trace.get_key(trace_storage), effort < *fuel) {
367
368            match trace_key.cmp(&batch_key) {
369                Ordering::Less => trace.seek_key(trace_storage, batch_key),
370                Ordering::Greater => batch.seek_key(batch_storage, trace_key),
371                Ordering::Equal => {
372
373                    thinker.history1.edits.load(trace, trace_storage, |time| {
374                        let mut time = C1::owned_time(time);
375                        time.join_assign(meet);
376                        time
377                    });
378                    thinker.history2.edits.load(batch, batch_storage, |time| C2::owned_time(time));
379
380                    // populate `temp` with the results in the best way we know how.
381                    thinker.think(|v1,v2,t,r1,r2| {
382                        logic(batch_key, v1, v2, &t, r1, r2, &mut session);
383                    });
384
385                    // TODO: Effort isn't perfectly tracked as we might still have some data in the
386                    // session at the moment it's dropped.
387                    effort += session.builder().0.take();
388                    batch.step_key(batch_storage);
389                    trace.step_key(trace_storage);
390
391                    thinker.history1.clear();
392                    thinker.history2.clear();
393                }
394            }
395        }
396        self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
397
398        if effort > *fuel { *fuel = 0; }
399        else              { *fuel -= effort; }
400    }
401}
402
403struct JoinThinker<'a, C1, C2>
404where
405    C1: Cursor,
406    C2: Cursor<Time = C1::Time>,
407{
408    pub history1: ValueHistory<'a, C1>,
409    pub history2: ValueHistory<'a, C2>,
410}
411
412impl<'a, C1, C2> JoinThinker<'a, C1, C2>
413where
414    C1: Cursor,
415    C2: Cursor<Time = C1::Time>,
416{
417    fn new() -> Self {
418        JoinThinker {
419            history1: ValueHistory::new(),
420            history2: ValueHistory::new(),
421        }
422    }
423
424    fn think<F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
425
426        // for reasonably sized edits, do the dead-simple thing.
427        if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
428            self.history1.edits.map(|v1, t1, d1| {
429                self.history2.edits.map(|v2, t2, d2| {
430                    results(v1, v2, t1.join(t2), d1, d2);
431                })
432            })
433        }
434        else {
435
436            let mut replay1 = self.history1.replay();
437            let mut replay2 = self.history2.replay();
438
439            // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
440            //       in here. If a time is ever repeated, for example, the call will be identical
441            //       and accomplish nothing. If only a single record has been added, it may not
442            //       be worth the time to collapse (advance, re-sort) the data when a linear scan
443            //       is sufficient.
444
445            while !replay1.is_done() && !replay2.is_done() {
446
447                if replay1.time().unwrap().cmp(replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
448                    replay2.advance_buffer_by(replay1.meet().unwrap());
449                    for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
450                        let (val1, time1, diff1) = replay1.edit().unwrap();
451                        results(val1, val2, time1.join(time2), diff1, diff2);
452                    }
453                    replay1.step();
454                }
455                else {
456                    replay1.advance_buffer_by(replay2.meet().unwrap());
457                    for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
458                        let (val2, time2, diff2) = replay2.edit().unwrap();
459                        results(val1, val2, time1.join(time2), diff1, diff2);
460                    }
461                    replay2.step();
462                }
463            }
464
465            while !replay1.is_done() {
466                replay2.advance_buffer_by(replay1.meet().unwrap());
467                for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
468                    let (val1, time1, diff1) = replay1.edit().unwrap();
469                    results(val1, val2, time1.join(time2), diff1, diff2);
470                }
471                replay1.step();
472            }
473            while !replay2.is_done() {
474                replay1.advance_buffer_by(replay2.meet().unwrap());
475                for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
476                    let (val2, time2, diff2) = replay2.edit().unwrap();
477                    results(val1, val2, time1.join(time2), diff1, diff2);
478                }
479                replay2.step();
480            }
481        }
482    }
483}