palimpsest_dataflow/operators/join.rs
1//! Match pairs of records based on a key.
2//!
3//! The various `join` implementations require that the units of each collection can be multiplied, and that
4//! the multiplication distributes over addition. That is, we will repeatedly evaluate (a + b) * c as (a * c)
5//! + (b * c), and if this is not equal to the former term, little is known about the actual output.
6use std::cmp::Ordering;
7
8use timely::container::PushInto;
9use timely::dataflow::channels::pact::Pipeline;
10use timely::dataflow::operators::generic::{Operator, OutputBuilderSession, Session};
11use timely::dataflow::operators::Capability;
12use timely::dataflow::{Scope, StreamCore};
13use timely::order::PartialOrder;
14use timely::progress::Timestamp;
15use timely::{Accountable, ContainerBuilder};
16
17use crate::difference::{Abelian, Multiply, Semigroup};
18use crate::hashable::Hashable;
19use crate::lattice::Lattice;
20use crate::operators::arrange::{ArrangeByKey, ArrangeBySelf, Arranged};
21use crate::operators::ValueHistory;
22use crate::trace::{BatchReader, Cursor};
23use crate::{Data, ExchangeData, VecCollection};
24
25use crate::trace::TraceReader;
26
27/// Join implementations for `(key,val)` data.
28pub trait Join<G: Scope, K: Data, V: Data, R: Semigroup> {
29 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and yields pairs `(key, (val1, val2))`.
30 ///
31 /// The [`join_map`](Join::join_map) method may be more convenient for non-trivial processing pipelines.
32 ///
33 /// # Examples
34 ///
35 /// ```
36 /// use palimpsest_dataflow::input::Input;
37 /// use palimpsest_dataflow::operators::Join;
38 ///
39 /// ::timely::example(|scope| {
40 ///
41 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
42 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
43 /// let z = scope.new_collection_from(vec![(0, (1, 'a')), (1, (3, 'b'))]).1;
44 ///
45 /// x.join(&y)
46 /// .assert_eq(&z);
47 /// });
48 /// ```
49 fn join<V2, R2>(
50 &self,
51 other: &VecCollection<G, (K, V2), R2>,
52 ) -> VecCollection<G, (K, (V, V2)), <R as Multiply<R2>>::Output>
53 where
54 K: ExchangeData,
55 V2: ExchangeData,
56 R2: ExchangeData + Semigroup,
57 R: Multiply<R2, Output: Semigroup + 'static>,
58 {
59 self.join_map(other, |k, v, v2| (k.clone(), (v.clone(), v2.clone())))
60 }
61
62 /// Matches pairs `(key,val1)` and `(key,val2)` based on `key` and then applies a function.
63 ///
64 /// # Examples
65 ///
66 /// ```
67 /// use palimpsest_dataflow::input::Input;
68 /// use palimpsest_dataflow::operators::Join;
69 ///
70 /// ::timely::example(|scope| {
71 ///
72 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
73 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1;
74 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
75 ///
76 /// x.join_map(&y, |_key, &a, &b| (a,b))
77 /// .assert_eq(&z);
78 /// });
79 /// ```
80 fn join_map<V2, R2, D, L>(
81 &self,
82 other: &VecCollection<G, (K, V2), R2>,
83 logic: L,
84 ) -> VecCollection<G, D, <R as Multiply<R2>>::Output>
85 where
86 K: ExchangeData,
87 V2: ExchangeData,
88 R2: ExchangeData + Semigroup,
89 R: Multiply<R2, Output: Semigroup + 'static>,
90 D: Data,
91 L: FnMut(&K, &V, &V2) -> D + 'static;
92
93 /// Matches pairs `(key, val)` and `key` based on `key`, producing the former with frequencies multiplied.
94 ///
95 /// When the second collection contains frequencies that are either zero or one this is the more traditional
96 /// relational semijoin. When the second collection may contain multiplicities, this operation may scale up
97 /// the counts of the records in the first input.
98 ///
99 /// # Examples
100 ///
101 /// ```
102 /// use palimpsest_dataflow::input::Input;
103 /// use palimpsest_dataflow::operators::Join;
104 ///
105 /// ::timely::example(|scope| {
106 ///
107 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
108 /// let y = scope.new_collection_from(vec![0, 2]).1;
109 /// let z = scope.new_collection_from(vec![(0, 1)]).1;
110 ///
111 /// x.semijoin(&y)
112 /// .assert_eq(&z);
113 /// });
114 /// ```
115 fn semijoin<R2>(
116 &self,
117 other: &VecCollection<G, K, R2>,
118 ) -> VecCollection<G, (K, V), <R as Multiply<R2>>::Output>
119 where
120 K: ExchangeData,
121 R2: ExchangeData + Semigroup,
122 R: Multiply<R2, Output: Semigroup + 'static>;
123
124 /// Subtracts the semijoin with `other` from `self`.
125 ///
126 /// In the case that `other` has multiplicities zero or one this results
127 /// in a relational antijoin, in which we discard input records whose key
128 /// is present in `other`. If the multiplicities could be other than zero
129 /// or one, the semantic interpretation of this operator is less clear.
130 ///
131 /// In almost all cases, you should ensure that `other` has multiplicities
132 /// that are zero or one, perhaps by using the `distinct` operator.
133 ///
134 /// # Examples
135 ///
136 /// ```
137 /// use palimpsest_dataflow::input::Input;
138 /// use palimpsest_dataflow::operators::Join;
139 ///
140 /// ::timely::example(|scope| {
141 ///
142 /// let x = scope.new_collection_from(vec![(0, 1), (1, 3)]).1;
143 /// let y = scope.new_collection_from(vec![0, 2]).1;
144 /// let z = scope.new_collection_from(vec![(1, 3)]).1;
145 ///
146 /// x.antijoin(&y)
147 /// .assert_eq(&z);
148 /// });
149 /// ```
150 fn antijoin<R2>(&self, other: &VecCollection<G, K, R2>) -> VecCollection<G, (K, V), R>
151 where
152 K: ExchangeData,
153 R2: ExchangeData + Semigroup,
154 R: Multiply<R2, Output = R>,
155 R: Abelian + 'static;
156}
157
158impl<G, K, V, R> Join<G, K, V, R> for VecCollection<G, (K, V), R>
159where
160 G: Scope<Timestamp: Lattice + Ord>,
161 K: ExchangeData + Hashable,
162 V: ExchangeData,
163 R: ExchangeData + Semigroup,
164{
165 fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
166 &self,
167 other: &VecCollection<G, (K, V2), R2>,
168 mut logic: L,
169 ) -> VecCollection<G, D, <R as Multiply<R2>>::Output>
170 where
171 R: Multiply<R2, Output: Semigroup + 'static>,
172 L: FnMut(&K, &V, &V2) -> D + 'static,
173 {
174 let arranged1 = self.arrange_by_key();
175 let arranged2 = other.arrange_by_key();
176 arranged1.join_core(&arranged2, move |k, v1, v2| Some(logic(k, v1, v2)))
177 }
178
179 fn semijoin<R2: ExchangeData + Semigroup>(
180 &self,
181 other: &VecCollection<G, K, R2>,
182 ) -> VecCollection<G, (K, V), <R as Multiply<R2>>::Output>
183 where
184 R: Multiply<R2, Output: Semigroup + 'static>,
185 {
186 let arranged1 = self.arrange_by_key();
187 let arranged2 = other.arrange_by_self();
188 arranged1.join_core(&arranged2, |k, v, _| Some((k.clone(), v.clone())))
189 }
190
191 fn antijoin<R2: ExchangeData + Semigroup>(
192 &self,
193 other: &VecCollection<G, K, R2>,
194 ) -> VecCollection<G, (K, V), R>
195 where
196 R: Multiply<R2, Output = R>,
197 R: Abelian + 'static,
198 {
199 self.concat(&self.semijoin(other).negate())
200 }
201}
202
203impl<G, K, V, Tr> Join<G, K, V, Tr::Diff> for Arranged<G, Tr>
204where
205 G: Scope<Timestamp = Tr::Time>,
206 Tr: for<'a> TraceReader<Key<'a> = &'a K, Val<'a> = &'a V> + Clone + 'static,
207 K: ExchangeData + Hashable,
208 V: Data + 'static,
209{
210 fn join_map<V2: ExchangeData, R2: ExchangeData + Semigroup, D: Data, L>(
211 &self,
212 other: &VecCollection<G, (K, V2), R2>,
213 mut logic: L,
214 ) -> VecCollection<G, D, <Tr::Diff as Multiply<R2>>::Output>
215 where
216 Tr::Diff: Multiply<R2, Output: Semigroup + 'static>,
217 L: for<'a> FnMut(Tr::Key<'a>, Tr::Val<'a>, &V2) -> D + 'static,
218 {
219 let arranged2 = other.arrange_by_key();
220 self.join_core(&arranged2, move |k, v1, v2| Some(logic(k, v1, v2)))
221 }
222
223 fn semijoin<R2: ExchangeData + Semigroup>(
224 &self,
225 other: &VecCollection<G, K, R2>,
226 ) -> VecCollection<G, (K, V), <Tr::Diff as Multiply<R2>>::Output>
227 where
228 Tr::Diff: Multiply<R2, Output: Semigroup + 'static>,
229 {
230 let arranged2 = other.arrange_by_self();
231 self.join_core(&arranged2, |k, v, _| Some((k.clone(), v.clone())))
232 }
233
234 fn antijoin<R2: ExchangeData + Semigroup>(
235 &self,
236 other: &VecCollection<G, K, R2>,
237 ) -> VecCollection<G, (K, V), Tr::Diff>
238 where
239 Tr::Diff: Multiply<R2, Output = Tr::Diff>,
240 Tr::Diff: Abelian + 'static,
241 {
242 self.as_collection(|k, v| (k.clone(), v.clone()))
243 .concat(&self.semijoin(other).negate())
244 }
245}
246
247/// Matches the elements of two arranged traces.
248///
249/// This method is used by the various `join` implementations, but it can also be used
250/// directly in the event that one has a handle to an `Arranged<G,T>`, perhaps because
251/// the arrangement is available for re-use, or from the output of a `reduce` operator.
252pub trait JoinCore<
253 G: Scope<Timestamp: Lattice + Ord>,
254 K: 'static + ?Sized,
255 V: 'static + ?Sized,
256 R: Semigroup,
257>
258{
259 /// Joins two arranged collections with the same key type.
260 ///
261 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
262 /// which produces something implementing `IntoIterator`, where the output collection will have an entry for
263 /// every value returned by the iterator.
264 ///
265 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
266 /// contains the implementations for collections.
267 ///
268 /// # Examples
269 ///
270 /// ```
271 /// use palimpsest_dataflow::input::Input;
272 /// use palimpsest_dataflow::operators::arrange::ArrangeByKey;
273 /// use palimpsest_dataflow::operators::join::JoinCore;
274 /// use palimpsest_dataflow::trace::Trace;
275 ///
276 /// ::timely::example(|scope| {
277 ///
278 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
279 /// .arrange_by_key();
280 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
281 /// .arrange_by_key();
282 ///
283 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b')]).1;
284 ///
285 /// x.join_core(&y, |_key, &a, &b| Some((a, b)))
286 /// .assert_eq(&z);
287 /// });
288 /// ```
289 fn join_core<Tr2, I, L>(
290 &self,
291 stream2: &Arranged<G, Tr2>,
292 result: L,
293 ) -> VecCollection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
294 where
295 Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
296 R: Multiply<Tr2::Diff, Output: Semigroup + 'static>,
297 I: IntoIterator<Item: Data>,
298 L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static;
299
300 /// An unsafe variant of `join_core` where the `result` closure takes additional arguments for `time` and
301 /// `diff` as input and returns an iterator over `(data, time, diff)` triplets. This allows for more
302 /// flexibility, but is more error-prone.
303 ///
304 /// Each matching pair of records `(key, val1)` and `(key, val2)` are subjected to the `result` function,
305 /// which produces something implementing `IntoIterator`, where the output collection will have an entry
306 /// for every value returned by the iterator.
307 ///
308 /// This trait is implemented for arrangements (`Arranged<G, T>`) rather than collections. The `Join` trait
309 /// contains the implementations for collections.
310 ///
311 /// # Examples
312 ///
313 /// ```
314 /// use palimpsest_dataflow::input::Input;
315 /// use palimpsest_dataflow::operators::arrange::ArrangeByKey;
316 /// use palimpsest_dataflow::operators::join::JoinCore;
317 /// use palimpsest_dataflow::trace::Trace;
318 ///
319 /// ::timely::example(|scope| {
320 ///
321 /// let x = scope.new_collection_from(vec![(0u32, 1), (1, 3)]).1
322 /// .arrange_by_key();
323 /// let y = scope.new_collection_from(vec![(0, 'a'), (1, 'b')]).1
324 /// .arrange_by_key();
325 ///
326 /// let z = scope.new_collection_from(vec![(1, 'a'), (3, 'b'), (3, 'b'), (3, 'b')]).1;
327 ///
328 /// // Returned values have weight `a`
329 /// x.join_core_internal_unsafe(&y, |_key, &a, &b, &t, &r1, &r2| Some(((a, b), t.clone(), a)))
330 /// .assert_eq(&z);
331 /// });
332 /// ```
333 fn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
334 &self,
335 stream2: &Arranged<G, Tr2>,
336 result: L,
337 ) -> VecCollection<G, D, ROut>
338 where
339 Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
340 D: Data,
341 ROut: Semigroup + 'static,
342 I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
343 L: for<'a> FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static;
344}
345
346impl<G, K, V, R> JoinCore<G, K, V, R> for VecCollection<G, (K, V), R>
347where
348 G: Scope<Timestamp: Lattice + Ord>,
349 K: ExchangeData + Hashable,
350 V: ExchangeData,
351 R: ExchangeData + Semigroup,
352{
353 fn join_core<Tr2, I, L>(
354 &self,
355 stream2: &Arranged<G, Tr2>,
356 result: L,
357 ) -> VecCollection<G, I::Item, <R as Multiply<Tr2::Diff>>::Output>
358 where
359 Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
360 R: Multiply<Tr2::Diff, Output: Semigroup + 'static>,
361 I: IntoIterator<Item: Data>,
362 L: FnMut(&K, &V, Tr2::Val<'_>) -> I + 'static,
363 {
364 self.arrange_by_key().join_core(stream2, result)
365 }
366
367 fn join_core_internal_unsafe<Tr2, I, L, D, ROut>(
368 &self,
369 stream2: &Arranged<G, Tr2>,
370 result: L,
371 ) -> VecCollection<G, D, ROut>
372 where
373 Tr2: for<'a> TraceReader<Key<'a> = &'a K, Time = G::Timestamp> + Clone + 'static,
374 I: IntoIterator<Item = (D, G::Timestamp, ROut)>,
375 L: FnMut(&K, &V, Tr2::Val<'_>, &G::Timestamp, &R, &Tr2::Diff) -> I + 'static,
376 D: Data,
377 ROut: Semigroup + 'static,
378 {
379 self.arrange_by_key()
380 .join_core_internal_unsafe(stream2, result)
381 }
382}
383
384/// The session passed to join closures.
385pub type JoinSession<'a, 'b, T, CB, CT> = Session<'a, 'b, T, EffortBuilder<CB>, CT>;
386
387/// A container builder that tracks the length of outputs to estimate the effort of join closures.
388#[derive(Default, Debug)]
389pub struct EffortBuilder<CB>(pub std::cell::Cell<usize>, pub CB);
390
391impl<CB: ContainerBuilder> timely::container::ContainerBuilder for EffortBuilder<CB> {
392 type Container = CB::Container;
393
394 #[inline]
395 fn extract(&mut self) -> Option<&mut Self::Container> {
396 let extracted = self.1.extract();
397 self.0
398 .replace(self.0.take() + extracted.as_ref().map_or(0, |e| e.record_count() as usize));
399 extracted
400 }
401
402 #[inline]
403 fn finish(&mut self) -> Option<&mut Self::Container> {
404 let finished = self.1.finish();
405 self.0
406 .replace(self.0.take() + finished.as_ref().map_or(0, |e| e.record_count() as usize));
407 finished
408 }
409}
410
411impl<CB: PushInto<D>, D> PushInto<D> for EffortBuilder<CB> {
412 #[inline]
413 fn push_into(&mut self, item: D) {
414 self.1.push_into(item);
415 }
416}
417
418/// An equijoin of two traces, sharing a common key type.
419///
420/// This method exists to provide join functionality without opinions on the specific input types, keys and values,
421/// that should be presented. The two traces here can have arbitrary key and value types, which can be unsized and
422/// even potentially unrelated to the input collection data. Importantly, the key and value types could be generic
423/// associated types (GATs) of the traces, and we would seemingly struggle to frame these types as trait arguments.
424///
425/// The implementation produces a caller-specified container. Implementations can use [`AsCollection`] to wrap the
426/// output stream in a collection.
427///
428/// The "correctness" of this method depends heavily on the behavior of the supplied `result` function.
429///
430/// [`AsCollection`]: crate::collection::AsCollection
431pub fn join_traces<G, T1, T2, L, CB>(
432 arranged1: &Arranged<G, T1>,
433 arranged2: &Arranged<G, T2>,
434 mut result: L,
435) -> StreamCore<G, CB::Container>
436where
437 G: Scope<Timestamp = T1::Time>,
438 T1: TraceReader + Clone + 'static,
439 T2: for<'a> TraceReader<Key<'a> = T1::Key<'a>, Time = T1::Time> + Clone + 'static,
440 L: FnMut(
441 T1::Key<'_>,
442 T1::Val<'_>,
443 T2::Val<'_>,
444 &G::Timestamp,
445 &T1::Diff,
446 &T2::Diff,
447 &mut JoinSession<T1::Time, CB, Capability<T1::Time>>,
448 ) + 'static,
449 CB: ContainerBuilder,
450{
451 // Rename traces for symmetry from here on out.
452 let mut trace1 = arranged1.trace.clone();
453 let mut trace2 = arranged2.trace.clone();
454
455 arranged1.stream.binary_frontier(
456 &arranged2.stream,
457 Pipeline,
458 Pipeline,
459 "Join",
460 move |capability, info| {
461 // Acquire an activator to reschedule the operator when it has unfinished work.
462 use timely::scheduling::Activator;
463 let activations = arranged1.stream.scope().activations().clone();
464 let activator = Activator::new(info.address, activations);
465
466 // Our initial invariants are that for each trace, physical compaction is less or equal the trace's upper bound.
467 // These invariants ensure that we can reference observed batch frontiers from `_start_upper` onward, as long as
468 // we maintain our physical compaction capabilities appropriately. These assertions are tested as we load up the
469 // initial work for the two traces, and before the operator is constructed.
470
471 // Acknowledged frontier for each input.
472 // These two are used exclusively to track batch boundaries on which we may want/need to call `cursor_through`.
473 // They will drive our physical compaction of each trace, and we want to maintain at all times that each is beyond
474 // the physical compaction frontier of their corresponding trace.
475 // Should we ever *drop* a trace, these are 1. much harder to maintain correctly, but 2. no longer used.
476 use timely::progress::frontier::Antichain;
477 let mut acknowledged1 = Antichain::from_elem(<G::Timestamp>::minimum());
478 let mut acknowledged2 = Antichain::from_elem(<G::Timestamp>::minimum());
479
480 // deferred work of batches from each input.
481 let mut todo1 = std::collections::VecDeque::new();
482 let mut todo2 = std::collections::VecDeque::new();
483
484 // We'll unload the initial batches here, to put ourselves in a less non-deterministic state to start.
485 trace1.map_batches(|batch1| {
486 acknowledged1.clone_from(batch1.upper());
487 // No `todo1` work here, because we haven't accepted anything into `batches2` yet.
488 // It is effectively "empty", because we choose to drain `trace1` before `trace2`.
489 // Once we start streaming batches in, we will need to respond to new batches from
490 // `input1` with logic that would have otherwise been here. Check out the next loop
491 // for the structure.
492 });
493 // At this point, `ack1` should exactly equal `trace1.read_upper()`, as they are both determined by
494 // iterating through batches and capturing the upper bound. This is a great moment to assert that
495 // `trace1`'s physical compaction frontier is before the frontier of completed times in `trace1`.
496 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
497 assert!(PartialOrder::less_equal(
498 &trace1.get_physical_compaction(),
499 &acknowledged1.borrow()
500 ));
501
502 // We capture batch2 cursors first and establish work second to avoid taking a `RefCell` lock
503 // on both traces at the same time, as they could be the same trace and this would panic.
504 let mut batch2_cursors = Vec::new();
505 trace2.map_batches(|batch2| {
506 acknowledged2.clone_from(batch2.upper());
507 batch2_cursors.push((batch2.cursor(), batch2.clone()));
508 });
509 // At this point, `ack2` should exactly equal `trace2.read_upper()`, as they are both determined by
510 // iterating through batches and capturing the upper bound. This is a great moment to assert that
511 // `trace2`'s physical compaction frontier is before the frontier of completed times in `trace2`.
512 // TODO: in the case that this does not hold, instead "upgrade" the physical compaction frontier.
513 assert!(PartialOrder::less_equal(
514 &trace2.get_physical_compaction(),
515 &acknowledged2.borrow()
516 ));
517
518 // Load up deferred work using trace2 cursors and batches captured just above.
519 for (batch2_cursor, batch2) in batch2_cursors.into_iter() {
520 // It is safe to ask for `ack1` because we have confirmed it to be in advance of `distinguish_since`.
521 let (trace1_cursor, trace1_storage) =
522 trace1.cursor_through(acknowledged1.borrow()).unwrap();
523 // We could downgrade the capability here, but doing so is a bit complicated mathematically.
524 // TODO: downgrade the capability by searching out the one time in `batch2.lower()` and not
525 // in `batch2.upper()`. Only necessary for non-empty batches, as empty batches may not have
526 // that property.
527 todo2.push_back(Deferred::new(
528 trace1_cursor,
529 trace1_storage,
530 batch2_cursor,
531 batch2.clone(),
532 capability.clone(),
533 ));
534 }
535
536 // Droppable handles to shared trace data structures.
537 let mut trace1_option = Some(trace1);
538 let mut trace2_option = Some(trace2);
539
540 move |(input1, frontier1), (input2, frontier2), output| {
541 // 1. Consuming input.
542 //
543 // The join computation repeatedly accepts batches of updates from each of its inputs.
544 //
545 // For each accepted batch, it prepares a work-item to join the batch against previously "accepted"
546 // updates from its other input. It is important to track which updates have been accepted, because
547 // we use a shared trace and there may be updates present that are in advance of this accepted bound.
548 //
549 // Batches are accepted: 1. in bulk at start-up (above), 2. as we observe them in the input stream,
550 // and 3. if the trace can confirm a region of empty space directly following our accepted bound.
551 // This last case is a consequence of our inability to transmit empty batches, as they may be formed
552 // in the absence of timely dataflow capabilities.
553
554 // Drain input 1, prepare work.
555 input1.for_each(|capability, data| {
556 // This test *should* always pass, as we only drop a trace in response to the other input emptying.
557 if let Some(ref mut trace2) = trace2_option {
558 let capability = capability.retain();
559 for batch1 in data.drain(..) {
560 // Ignore any pre-loaded data.
561 if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
562 if !batch1.is_empty() {
563 // It is safe to ask for `ack2` as we validated that it was at least `get_physical_compaction()`
564 // at start-up, and have held back physical compaction ever since.
565 let (trace2_cursor, trace2_storage) =
566 trace2.cursor_through(acknowledged2.borrow()).unwrap();
567 let batch1_cursor = batch1.cursor();
568 todo1.push_back(Deferred::new(
569 trace2_cursor,
570 trace2_storage,
571 batch1_cursor,
572 batch1.clone(),
573 capability.clone(),
574 ));
575 }
576
577 // To update `acknowledged1` we might presume that `batch1.lower` should equal it, but we
578 // may have skipped over empty batches. Still, the batches are in-order, and we should be
579 // able to just assume the most recent `batch1.upper`
580 debug_assert!(PartialOrder::less_equal(
581 &acknowledged1,
582 batch1.upper()
583 ));
584 acknowledged1.clone_from(batch1.upper());
585 }
586 }
587 } else {
588 panic!("`trace2_option` dropped before `input1` emptied!");
589 }
590 });
591
592 // Drain input 2, prepare work.
593 input2.for_each(|capability, data| {
594 // This test *should* always pass, as we only drop a trace in response to the other input emptying.
595 if let Some(ref mut trace1) = trace1_option {
596 let capability = capability.retain();
597 for batch2 in data.drain(..) {
598 // Ignore any pre-loaded data.
599 if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
600 if !batch2.is_empty() {
601 // It is safe to ask for `ack1` as we validated that it was at least `get_physical_compaction()`
602 // at start-up, and have held back physical compaction ever since.
603 let (trace1_cursor, trace1_storage) =
604 trace1.cursor_through(acknowledged1.borrow()).unwrap();
605 let batch2_cursor = batch2.cursor();
606 todo2.push_back(Deferred::new(
607 trace1_cursor,
608 trace1_storage,
609 batch2_cursor,
610 batch2.clone(),
611 capability.clone(),
612 ));
613 }
614
615 // To update `acknowledged2` we might presume that `batch2.lower` should equal it, but we
616 // may have skipped over empty batches. Still, the batches are in-order, and we should be
617 // able to just assume the most recent `batch2.upper`
618 debug_assert!(PartialOrder::less_equal(
619 &acknowledged2,
620 batch2.upper()
621 ));
622 acknowledged2.clone_from(batch2.upper());
623 }
624 }
625 } else {
626 panic!("`trace1_option` dropped before `input2` emptied!");
627 }
628 });
629
630 // Advance acknowledged frontiers through any empty regions that we may not receive as batches.
631 if let Some(trace1) = trace1_option.as_mut() {
632 trace1.advance_upper(&mut acknowledged1);
633 }
634 if let Some(trace2) = trace2_option.as_mut() {
635 trace2.advance_upper(&mut acknowledged2);
636 }
637
638 // 2. Join computation.
639 //
640 // For each of the inputs, we do some amount of work (measured in terms of number
641 // of output records produced). This is meant to yield control to allow downstream
642 // operators to consume and reduce the output, but it it also means to provide some
643 // degree of responsiveness. There is a potential risk here that if we fall behind
644 // then the increasing queues hold back physical compaction of the underlying traces
645 // which results in unintentionally quadratic processing time (each batch of either
646 // input must scan all batches from the other input).
647
648 // Perform some amount of outstanding work.
649 let mut fuel = 1_000_000;
650 while !todo1.is_empty() && fuel > 0 {
651 todo1.front_mut().unwrap().work(
652 output,
653 |k, v2, v1, t, r2, r1, c| result(k, v1, v2, t, r1, r2, c),
654 &mut fuel,
655 );
656 if !todo1.front().unwrap().work_remains() {
657 todo1.pop_front();
658 }
659 }
660
661 // Perform some amount of outstanding work.
662 let mut fuel = 1_000_000;
663 while !todo2.is_empty() && fuel > 0 {
664 todo2.front_mut().unwrap().work(
665 output,
666 |k, v1, v2, t, r1, r2, c| result(k, v1, v2, t, r1, r2, c),
667 &mut fuel,
668 );
669 if !todo2.front().unwrap().work_remains() {
670 todo2.pop_front();
671 }
672 }
673
674 // Re-activate operator if work remains.
675 if !todo1.is_empty() || !todo2.is_empty() {
676 activator.activate();
677 }
678
679 // 3. Trace maintenance.
680 //
681 // Importantly, we use `input.frontier()` here rather than `acknowledged` to track
682 // the progress of an input, because should we ever drop one of the traces we will
683 // lose the ability to extract information from anything other than the input.
684 // For example, if we dropped `trace2` we would not be able to use `advance_upper`
685 // to keep `acknowledged2` up to date wrt empty batches, and would hold back logical
686 // compaction of `trace1`.
687
688 // Maintain `trace1`. Drop if `input2` is empty, or advance based on future needs.
689 if let Some(trace1) = trace1_option.as_mut() {
690 if frontier2.is_empty() {
691 trace1_option = None;
692 } else {
693 // Allow `trace1` to compact logically up to the frontier we may yet receive,
694 // in the opposing input (`input2`). All `input2` times will be beyond this
695 // frontier, and joined times only need to be accurate when advanced to it.
696 trace1.set_logical_compaction(frontier2.frontier());
697 // Allow `trace1` to compact physically up to the upper bound of batches we
698 // have received in its input (`input1`). We will not require a cursor that
699 // is not beyond this bound.
700 trace1.set_physical_compaction(acknowledged1.borrow());
701 }
702 }
703
704 // Maintain `trace2`. Drop if `input1` is empty, or advance based on future needs.
705 if let Some(trace2) = trace2_option.as_mut() {
706 if frontier1.is_empty() {
707 trace2_option = None;
708 } else {
709 // Allow `trace2` to compact logically up to the frontier we may yet receive,
710 // in the opposing input (`input1`). All `input1` times will be beyond this
711 // frontier, and joined times only need to be accurate when advanced to it.
712 trace2.set_logical_compaction(frontier1.frontier());
713 // Allow `trace2` to compact physically up to the upper bound of batches we
714 // have received in its input (`input2`). We will not require a cursor that
715 // is not beyond this bound.
716 trace2.set_physical_compaction(acknowledged2.borrow());
717 }
718 }
719 }
720 },
721 )
722}
723
724/// Deferred join computation.
725///
726/// The structure wraps cursors which allow us to play out join computation at whatever rate we like.
727/// This allows us to avoid producing and buffering massive amounts of data, without giving the timely
728/// dataflow system a chance to run operators that can consume and aggregate the data.
729struct Deferred<T, C1, C2>
730where
731 T: Timestamp + Lattice + Ord,
732 C1: Cursor<Time = T>,
733 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T>,
734{
735 trace: C1,
736 trace_storage: C1::Storage,
737 batch: C2,
738 batch_storage: C2::Storage,
739 capability: Capability<T>,
740 done: bool,
741}
742
743impl<T, C1, C2> Deferred<T, C1, C2>
744where
745 C1: Cursor<Time = T>,
746 C2: for<'a> Cursor<Key<'a> = C1::Key<'a>, Time = T>,
747 T: Timestamp + Lattice + Ord,
748{
749 fn new(
750 trace: C1,
751 trace_storage: C1::Storage,
752 batch: C2,
753 batch_storage: C2::Storage,
754 capability: Capability<T>,
755 ) -> Self {
756 Deferred {
757 trace,
758 trace_storage,
759 batch,
760 batch_storage,
761 capability,
762 done: false,
763 }
764 }
765
766 fn work_remains(&self) -> bool {
767 !self.done
768 }
769
770 /// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
771 #[inline(never)]
772 fn work<L, CB: ContainerBuilder>(
773 &mut self,
774 output: &mut OutputBuilderSession<T, EffortBuilder<CB>>,
775 mut logic: L,
776 fuel: &mut usize,
777 ) where
778 L: for<'a> FnMut(
779 C1::Key<'a>,
780 C1::Val<'a>,
781 C2::Val<'a>,
782 &T,
783 &C1::Diff,
784 &C2::Diff,
785 &mut JoinSession<T, CB, Capability<T>>,
786 ),
787 {
788 let meet = self.capability.time();
789
790 let mut effort = 0;
791 let mut session = output.session_with_builder(&self.capability);
792
793 let trace_storage = &self.trace_storage;
794 let batch_storage = &self.batch_storage;
795
796 let trace = &mut self.trace;
797 let batch = &mut self.batch;
798
799 let mut thinker = JoinThinker::new();
800
801 while let (Some(batch_key), Some(trace_key), true) = (
802 batch.get_key(batch_storage),
803 trace.get_key(trace_storage),
804 effort < *fuel,
805 ) {
806 match trace_key.cmp(&batch_key) {
807 Ordering::Less => trace.seek_key(trace_storage, batch_key),
808 Ordering::Greater => batch.seek_key(batch_storage, trace_key),
809 Ordering::Equal => {
810 thinker.history1.edits.load(trace, trace_storage, |time| {
811 let mut time = C1::owned_time(time);
812 time.join_assign(meet);
813 time
814 });
815 thinker
816 .history2
817 .edits
818 .load(batch, batch_storage, |time| C2::owned_time(time));
819
820 // populate `temp` with the results in the best way we know how.
821 thinker.think(|v1, v2, t, r1, r2| {
822 logic(batch_key, v1, v2, &t, r1, r2, &mut session);
823 });
824
825 // TODO: Effort isn't perfectly tracked as we might still have some data in the
826 // session at the moment it's dropped.
827 effort += session.builder().0.take();
828 batch.step_key(batch_storage);
829 trace.step_key(trace_storage);
830
831 thinker.history1.clear();
832 thinker.history2.clear();
833 }
834 }
835 }
836 self.done = !batch.key_valid(batch_storage) || !trace.key_valid(trace_storage);
837
838 if effort > *fuel {
839 *fuel = 0;
840 } else {
841 *fuel -= effort;
842 }
843 }
844}
845
846struct JoinThinker<'a, C1, C2>
847where
848 C1: Cursor,
849 C2: Cursor<Time = C1::Time>,
850{
851 pub history1: ValueHistory<'a, C1>,
852 pub history2: ValueHistory<'a, C2>,
853}
854
855impl<'a, C1, C2> JoinThinker<'a, C1, C2>
856where
857 C1: Cursor,
858 C2: Cursor<Time = C1::Time>,
859{
860 fn new() -> Self {
861 JoinThinker {
862 history1: ValueHistory::new(),
863 history2: ValueHistory::new(),
864 }
865 }
866
867 fn think<F: FnMut(C1::Val<'a>, C2::Val<'a>, C1::Time, &C1::Diff, &C2::Diff)>(
868 &mut self,
869 mut results: F,
870 ) {
871 // for reasonably sized edits, do the dead-simple thing.
872 if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
873 self.history1.edits.map(|v1, t1, d1| {
874 self.history2.edits.map(|v2, t2, d2| {
875 results(v1, v2, t1.join(t2), d1, d2);
876 })
877 })
878 } else {
879 let mut replay1 = self.history1.replay();
880 let mut replay2 = self.history2.replay();
881
882 // TODO: It seems like there is probably a good deal of redundant `advance_buffer_by`
883 // in here. If a time is ever repeated, for example, the call will be identical
884 // and accomplish nothing. If only a single record has been added, it may not
885 // be worth the time to collapse (advance, re-sort) the data when a linear scan
886 // is sufficient.
887
888 while !replay1.is_done() && !replay2.is_done() {
889 if replay1.time().unwrap().cmp(replay2.time().unwrap())
890 == ::std::cmp::Ordering::Less
891 {
892 replay2.advance_buffer_by(replay1.meet().unwrap());
893 for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
894 let (val1, time1, diff1) = replay1.edit().unwrap();
895 results(val1, val2, time1.join(time2), diff1, diff2);
896 }
897 replay1.step();
898 } else {
899 replay1.advance_buffer_by(replay2.meet().unwrap());
900 for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
901 let (val2, time2, diff2) = replay2.edit().unwrap();
902 results(val1, val2, time1.join(time2), diff1, diff2);
903 }
904 replay2.step();
905 }
906 }
907
908 while !replay1.is_done() {
909 replay2.advance_buffer_by(replay1.meet().unwrap());
910 for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
911 let (val1, time1, diff1) = replay1.edit().unwrap();
912 results(val1, val2, time1.join(time2), diff1, diff2);
913 }
914 replay1.step();
915 }
916 while !replay2.is_done() {
917 replay1.advance_buffer_by(replay2.meet().unwrap());
918 for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
919 let (val2, time2, diff2) = replay2.edit().unwrap();
920 results(val1, val2, time1.join(time2), diff1, diff2);
921 }
922 replay2.step();
923 }
924 }
925 }
926}