Skip to main content

differential_dataflow/operators/
reduce.rs

1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use crate::Data;
9
10use timely::progress::frontier::Antichain;
11use timely::progress::Timestamp;
12use timely::dataflow::operators::Operator;
13use timely::dataflow::channels::pact::Pipeline;
14
15use crate::operators::arrange::{Arranged, TraceAgent};
16use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
17use crate::trace::cursor::CursorList;
18use crate::trace::implementations::containers::BatchContainer;
19use crate::trace::TraceReader;
20
21/// A key-wise reduction of values in an input trace.
22///
23/// This method exists to provide reduce functionality without opinions about qualifying trace types.
24///
25/// The `logic` closure is expected to take a key, accumulated input, and tentative accumulated output,
26/// and populate its final argument with whatever it feels to be appopriate updates. The behavior and
27/// correctness of the implementation rely on this making sense, and e.g. ideally the updates would if
28/// applied to the tentative output bring it in line with some function applied to the input.
29///
30/// The `push` closure is expected to clear its first argument, then populate it with the key and drain
31/// the value updates, as appropriate for the container. It is critical that it clear the container as
32/// the operator has no ability to do this otherwise, and failing to do so represents a leak from one
33/// key's computation to another, and will likely introduce non-determinism.
34pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent<Tr2>>
35where
36    Tr1: TraceReader + 'static,
37    Tr2: for<'a> Trace<Key<'a>=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static,
38    Bu: Builder<Time=Tr2::Time, Output = Tr2::Batch, Input: Default>,
39    L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
40    P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
41{
42    let mut result_trace = None;
43
44    // fabricate a data-parallel operator using the `unary_notify` pattern.
45    let stream = {
46
47        let mut source_trace = trace.trace;
48        let result_trace = &mut result_trace;
49        let scope = trace.stream.scope();
50        trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
51
52            // Acquire a logger for arrange events.
53            let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
54
55            let activator = Some(scope.activator_for(std::rc::Rc::clone(&operator_info.address)));
56            let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator);
57            // If there is default exert logic set, install it.
58            if let Some(exert_logic) = scope.worker().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
59                empty.set_exert_logic(exert_logic);
60            }
61
62            let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
63
64            *result_trace = Some(output_reader.clone());
65
66            let mut new_interesting_times = Vec::<Tr1::Time>::new();
67
68            // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
69            // sorted by (key, time), as well as capabilities for the lower envelope of the times.
70            let mut pending_keys = Tr1::KeyContainer::with_capacity(0);
71            let mut pending_time = Tr1::TimeContainer::with_capacity(0);
72            let mut next_pending_keys = Tr1::KeyContainer::with_capacity(0);
73            let mut next_pending_time = Tr1::TimeContainer::with_capacity(0);
74            let mut capabilities = timely::dataflow::operators::CapabilitySet::<Tr1::Time>::default();
75
76            // buffers and logic for computing per-key interesting times "efficiently".
77            let mut interesting_times = Vec::<Tr1::Time>::new();
78
79            // Upper and lower frontiers for the pending input and output batches to process.
80            let mut upper_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
81            let mut lower_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
82
83            // Output batches may need to be built piecemeal, and these temp storage help there.
84            let mut output_upper = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
85            let mut output_lower = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
86
87            move |(input, frontier), output| {
88
89                // The operator receives input batches, which it treats as contiguous and will collect and
90                // then process as one batch. It captures the input frontier from the batches, from the upstream
91                // trace, and from the input frontier, and retires the work through that interval.
92                //
93                // Reduce may retain capabilities and need to perform work and produce output at times that
94                // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)`
95                // may result in outputs at `(1, 1)` as well, even with no input at that time.
96
97                let mut batch_cursors = Vec::new();
98                let mut batch_storage = Vec::new();
99
100                // Downgrade previous upper limit to be current lower limit.
101                lower_limit.clear();
102                lower_limit.extend(upper_limit.borrow().iter().cloned());
103
104                // Drain input batches in order, capturing capabilities and the last upper.
105                input.for_each(|capability, batches| {
106                    capabilities.insert(capability.retain(0));
107                    for batch in batches.drain(..) {
108                        upper_limit.clone_from(batch.upper());
109                        batch_cursors.push(batch.cursor());
110                        batch_storage.push(batch);
111                    }
112                });
113
114                // Pull in any subsequent empty batches we believe to exist.
115                source_trace.advance_upper(&mut upper_limit);
116                // Incorporate the input frontier guarantees as well.
117                let mut joined = Antichain::new();
118                crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined);
119                upper_limit = joined;
120
121                // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed.
122                if upper_limit != lower_limit {
123
124                    // If we hold no capabilities in the interval [lower_limit, upper_limit) then we have no compute needs,
125                    // and could not transmit the outputs even if they were (incorrectly) non-zero.
126                    // We do have maintenance work after this logic, and should not fuse this test with the above test.
127                    if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
128
129                        // cursors for navigating input and output traces.
130                        let (mut source_cursor, ref source_storage): (Tr1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
131                        let (mut output_cursor, ref output_storage): (Tr2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
132                        let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
133
134                        // Prepare an output buffer and builder for each capability.
135                        // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
136                        //       this as long as it requires that there is only one capability for each message.
137                        let mut buffers = Vec::<(Tr1::Time, Vec<(Tr2::ValOwn, Tr1::Time, Tr2::Diff)>)>::new();
138                        let mut builders = Vec::new();
139                        for cap in capabilities.iter() {
140                            buffers.push((cap.time().clone(), Vec::new()));
141                            builders.push(Bu::new());
142                        }
143                        // Temporary staging for output building.
144                        let mut buffer = Bu::Input::default();
145
146                        // Reuseable state for performing the computation.
147                        let mut thinker = history_replay::HistoryReplayer::new();
148
149                        // Merge the received batch cursor with our list of interesting (key, time) moments.
150                        // The interesting moments need to be in the interval to prompt work.
151
152                        // March through the keys we must work on, merging `batch_cursors` and `exposed`.
153                        let mut pending_pos = 0;
154                        while batch_cursor.key_valid(batch_storage) || pending_pos < pending_keys.len() {
155
156                            // Determine the next key we will work on; could be synthetic, could be from a batch.
157                            let key1 = pending_keys.get(pending_pos);
158                            let key2 = batch_cursor.get_key(batch_storage);
159                            let key = match (key1, key2) {
160                                (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
161                                (Some(key1), None)       => key1,
162                                (None, Some(key2))       => key2,
163                                (None, None)             => unreachable!(),
164                            };
165
166                            // Populate `interesting_times` with interesting times not beyond `upper_limit`.
167                            // TODO: This could just be `pending_time` and indexes within `lower .. upper`.
168                            let prior_pos = pending_pos;
169                            interesting_times.clear();
170                            while pending_keys.get(pending_pos) == Some(key) {
171                                let owned_time = Tr1::owned_time(pending_time.index(pending_pos));
172                                if !upper_limit.less_equal(&owned_time) { interesting_times.push(owned_time); }
173                                pending_pos += 1;
174                            }
175
176                            // tidy up times, removing redundancy.
177                            sort_dedup(&mut interesting_times);
178
179                            // If there are new updates, or pending times, we must investigate!
180                            if batch_cursor.get_key(batch_storage) == Some(key) || !interesting_times.is_empty() {
181
182                                // do the per-key computation.
183                                thinker.compute(
184                                    key,
185                                    (&mut source_cursor, source_storage),
186                                    (&mut output_cursor, output_storage),
187                                    (&mut batch_cursor, batch_storage),
188                                    &interesting_times,
189                                    &mut logic,
190                                    &upper_limit,
191                                    &mut buffers[..],
192                                    &mut new_interesting_times,
193                                );
194
195                                // Advance the cursor if this key, so that the loop's validity check registers the work as done.
196                                if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); }
197
198                                // Merge novel pending times with any prior pending times we did not process.
199                                // TODO: This could be a merge, not a sort_dedup, because both lists should be sorted.
200                                for pos in prior_pos .. pending_pos {
201                                    let owned_time = Tr1::owned_time(pending_time.index(pos));
202                                    if upper_limit.less_equal(&owned_time) { new_interesting_times.push(owned_time); }
203                                }
204                                sort_dedup(&mut new_interesting_times);
205                                for time in new_interesting_times.drain(..) {
206                                    next_pending_keys.push_ref(key);
207                                    next_pending_time.push_own(&time);
208                                }
209
210                                // Sort each buffer by value and move into the corresponding builder.
211                                // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
212                                //       (ii) that the buffers are time-ordered, and (iii) that the builders accept
213                                //       arbitrarily ordered times.
214                                for index in 0 .. buffers.len() {
215                                    buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
216                                    push(&mut buffer, key, &mut buffers[index].1);
217                                    buffers[index].1.clear();
218                                    builders[index].push(&mut buffer);
219
220                                }
221                            }
222                            else {
223                                // copy over the pending key and times.
224                                for pos in prior_pos .. pending_pos {
225                                    next_pending_keys.push_ref(pending_keys.index(pos));
226                                    next_pending_time.push_ref(pending_time.index(pos));
227                                }
228                            }
229                        }
230                        // Drop to avoid lifetime issues that would lock `pending_{keys, time}`.
231                        drop(thinker);
232
233                        // We start sealing output batches from the lower limit (previous upper limit).
234                        // In principle, we could update `lower_limit` itself, and it should arrive at
235                        // `upper_limit` by the end of the process.
236                        output_lower.clear();
237                        output_lower.extend(lower_limit.borrow().iter().cloned());
238
239                        // build and ship each batch (because only one capability per message).
240                        for (index, builder) in builders.drain(..).enumerate() {
241
242                            // Form the upper limit of the next batch, which includes all times greater
243                            // than the input batch, or the capabilities from i + 1 onward.
244                            output_upper.clear();
245                            output_upper.extend(upper_limit.borrow().iter().cloned());
246                            for capability in &capabilities[index + 1 ..] {
247                                output_upper.insert_ref(capability.time());
248                            }
249
250                            if output_upper.borrow() != output_lower.borrow() {
251
252                                let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(Tr1::Time::minimum()));
253                                let batch = builder.done(description);
254
255                                // ship batch to the output, and commit to the output trace.
256                                output.session(&capabilities[index]).give(batch.clone());
257                                output_writer.insert(batch, Some(capabilities[index].time().clone()));
258
259                                output_lower.clear();
260                                output_lower.extend(output_upper.borrow().iter().cloned());
261                            }
262                        }
263                        // This should be true, as the final iteration introduces no capabilities, and
264                        // uses exactly `upper_limit` to determine the upper bound. Good to check though.
265                        assert!(output_upper.borrow() == upper_limit.borrow());
266
267                        // Refresh pending keys and times, then downgrade capabilities to the frontier of times.
268                        pending_keys.clear(); std::mem::swap(&mut next_pending_keys, &mut pending_keys);
269                        pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);
270
271                        // Update `capabilities` to reflect pending times.
272                        let mut frontier = Antichain::<Tr1::Time>::new();
273                        let mut owned_time = Tr1::Time::minimum();
274                        for pos in 0 .. pending_time.len() {
275                            Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
276                            frontier.insert_ref(&owned_time);
277                        }
278                        capabilities.downgrade(frontier);
279                    }
280
281                    // ensure that observed progress is reflected in the output.
282                    output_writer.seal(upper_limit.clone());
283
284                    // We only anticipate future times in advance of `upper_limit`.
285                    source_trace.set_logical_compaction(upper_limit.borrow());
286                    output_reader.set_logical_compaction(upper_limit.borrow());
287
288                    // We will only slice the data between future batches.
289                    source_trace.set_physical_compaction(upper_limit.borrow());
290                    output_reader.set_physical_compaction(upper_limit.borrow());
291                }
292
293                // Exert trace maintenance if we have been so requested.
294                output_writer.exert();
295            }
296        }
297    )
298    };
299
300    Arranged { stream, trace: result_trace.unwrap() }
301}
302
303
304#[inline(never)]
305fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
306    list.dedup();
307    list.sort();
308    list.dedup();
309}
310
311/// Implementation based on replaying historical and new updates together.
312mod history_replay {
313
314    use timely::progress::Antichain;
315
316    use crate::lattice::Lattice;
317    use crate::trace::Cursor;
318    use crate::operators::ValueHistory;
319
320    use super::sort_dedup;
321
322    /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
323    /// time order, maintaining consolidated representations of updates with respect to future interesting times.
324    pub struct HistoryReplayer<V1, V2, V, T, D1, D2> {
325        input_history: ValueHistory<V1, T, D1>,
326        output_history: ValueHistory<V2, T, D2>,
327        batch_history: ValueHistory<V1, T, D1>,
328        input_buffer: Vec<(V1, D1)>,
329        output_buffer: Vec<(V, D2)>,
330        update_buffer: Vec<(V, D2)>,
331        output_produced: Vec<((V, T), D2)>,
332        synth_times: Vec<T>,
333        meets: Vec<T>,
334        times_current: Vec<T>,
335        temporary: Vec<T>,
336    }
337
338    impl<V1, V2, V, T, D1, D2> HistoryReplayer<V1, V2, V, T, D1, D2>
339    where
340        V1: Copy + Ord,
341        V2: Copy + Ord,
342        V: Clone + Ord,
343        T: Ord + Clone + Lattice,
344        D1: Clone + crate::difference::Semigroup,
345        D2: Clone + crate::difference::Semigroup,
346    {
347        pub fn new() -> Self {
348            HistoryReplayer {
349                input_history: ValueHistory::new(),
350                output_history: ValueHistory::new(),
351                batch_history: ValueHistory::new(),
352                input_buffer: Vec::new(),
353                output_buffer: Vec::new(),
354                update_buffer: Vec::new(),
355                output_produced: Vec::new(),
356                synth_times: Vec::new(),
357                meets: Vec::new(),
358                times_current: Vec::new(),
359                temporary: Vec::new(),
360            }
361        }
362        #[inline(never)]
363        pub fn compute<'a, K, C1, C2, C3, L>(
364            &mut self,
365            key: K,
366            (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
367            (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
368            (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
369            times: &Vec<T>,
370            logic: &mut L,
371            upper_limit: &Antichain<T>,
372            outputs: &mut [(T, Vec<(V, T, D2)>)],
373            new_interesting: &mut Vec<T>)
374        where
375            C1: Cursor<Key<'a> = K, Val<'a> = V1, Time = T, Diff = D1>,
376            C2: Cursor<Key<'a> = K, Val<'a> = V2, ValOwn = V, Time = T, Diff = D2>,
377            C3: Cursor<Key<'a> = K, Val<'a> = V1, Time = T, Diff = D1>,
378            K: Copy + Ord,
379            L: FnMut(K, &[(V1, D1)], &mut Vec<(V, D2)>, &mut Vec<(V, D2)>),
380        {
381
382            // The work we need to perform is at times defined principally by the contents of `batch_cursor`
383            // and `times`, respectively "new work we just received" and "old times we were warned about".
384            //
385            // Our first step is to identify these times, so that we can use them to restrict the amount of
386            // information we need to recover from `input` and `output`; as all times of interest will have
387            // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
388            // loaded times by performing the lattice `join` with this value.
389
390            // Load the batch contents.
391            let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, None);
392
393            // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
394            // can be used to advance other historical times, which may consolidate their representation. As
395            // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
396            // history forward.
397
398            self.meets.clear();
399            self.meets.extend(times.iter().cloned());
400            for index in (1 .. self.meets.len()).rev() {
401                self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
402            }
403
404            // Determine the meet of times in `batch` and `times`.
405            let mut meet = None;
406            update_meet(&mut meet, self.meets.get(0));
407            update_meet(&mut meet, batch_replay.meet());
408
409            // Having determined the meet, we can load the input and output histories, where we
410            // advance all times by joining them with `meet`. The resulting times are more compact
411            // and guaranteed to accumulate identically for times greater or equal to `meet`.
412
413            // Load the input and output histories.
414            let mut input_replay =
415            self.input_history.replay_key(source_cursor, source_storage, key, meet.as_ref());
416            let mut output_replay =
417            self.output_history.replay_key(output_cursor, output_storage, key, meet.as_ref());
418
419            self.synth_times.clear();
420            self.times_current.clear();
421            self.output_produced.clear();
422
423            // The frontier of times we may still consider.
424            // Derived from frontiers of our update histories, supplied times, and synthetic times.
425
426            let mut times_slice = &times[..];
427            let mut meets_slice = &self.meets[..];
428
429            // We have candidate times from `batch` and `times`, as well as times identified by either
430            // `input` or `output`. Finally, we may have synthetic times produced as the join of times
431            // we consider in the course of evaluation. As long as any of these times exist, we need to
432            // keep examining times.
433            while let Some(next_time) = [   batch_replay.time(),
434                                            times_slice.first(),
435                                            input_replay.time(),
436                                            output_replay.time(),
437                                            self.synth_times.last(),
438                                        ].into_iter().flatten().min().cloned() {
439
440                // Advance input and output history replayers. This marks applicable updates as active.
441                input_replay.step_while_time_is(&next_time);
442                output_replay.step_while_time_is(&next_time);
443
444                // One of our goals is to determine if `next_time` is "interesting", meaning whether we
445                // have any evidence that we should re-evaluate the user logic at this time. For a time
446                // to be "interesting" it would need to be the join of times that include either a time
447                // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
448
449                // Advance batch history, and capture whether an update exists at `next_time`.
450                let mut interesting = batch_replay.step_while_time_is(&next_time);
451                if interesting { if let Some(meet) = meet.as_ref() { batch_replay.advance_buffer_by(meet); } }
452
453                // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
454                while self.synth_times.last() == Some(&next_time) {
455                    // We don't know enough about `next_time` to avoid putting it in to `times_current`.
456                    // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
457                    self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
458                    interesting = true;
459                }
460                while times_slice.first() == Some(&next_time) {
461                    // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
462                    self.times_current.push(times_slice[0].clone());
463                    times_slice = &times_slice[1..];
464                    meets_slice = &meets_slice[1..];
465                    interesting = true;
466                }
467
468                // Times could also be interesting if an interesting time is less than them, as they would join
469                // and become the time itself. They may not equal the current time because whatever frontier we
470                // are tracking may not have advanced far enough.
471                // TODO: `batch_history` may or may not be super compact at this point, and so this check might
472                //       yield false positives if not sufficiently compact. Maybe we should look into this and see.
473                interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
474                interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
475
476                // We should only process times that are not in advance of `upper_limit`.
477                //
478                // We have no particular guarantee that known times will not be in advance of `upper_limit`.
479                // We may have the guarantee that synthetic times will not be, as we test against the limit
480                // before we add the time to `synth_times`.
481                if !upper_limit.less_equal(&next_time) {
482
483                    // We should re-evaluate the computation if this is an interesting time.
484                    // If the time is uninteresting (and our logic is sound) it is not possible for there to be
485                    // output produced. This sounds like a good test to have for debug builds!
486                    if interesting {
487
488                        // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
489                        debug_assert!(self.input_buffer.is_empty());
490                        if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) };
491                        for ((value, time), diff) in input_replay.buffer().iter() {
492                            if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
493                            else { self.temporary.push(next_time.join(time)); }
494                        }
495                        for ((value, time), diff) in batch_replay.buffer().iter() {
496                            if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
497                            else { self.temporary.push(next_time.join(time)); }
498                        }
499                        crate::consolidation::consolidate(&mut self.input_buffer);
500
501                        // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use).
502                        if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) };
503                        for ((value, time), diff) in output_replay.buffer().iter() {
504                            if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); }
505                            else { self.temporary.push(next_time.join(time)); }
506                        }
507                        for ((value, time), diff) in self.output_produced.iter() {
508                            if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); }
509                            else { self.temporary.push(next_time.join(time)); }
510                        }
511                        crate::consolidation::consolidate(&mut self.output_buffer);
512
513                        // Apply user logic if non-empty input or output and see what happens!
514                        if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
515                            logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
516                            self.input_buffer.clear();
517                            self.output_buffer.clear();
518
519                            // Having subtracted output updates from user output, consolidate the results to determine
520                            // if there is anything worth reporting. Note: this also orders the results by value, so
521                            // that could make the above merging plan even easier.
522                            //
523                            // Stash produced updates into both capability-indexed buffers and `output_produced`.
524                            // The two locations are important, in that we will compact `output_produced` as we move
525                            // through times, but we cannot compact the output buffers because we need their actual
526                            // times.
527                            crate::consolidation::consolidate(&mut self.update_buffer);
528                            if !self.update_buffer.is_empty() {
529
530                                // We *should* be able to find a capability for `next_time`. Any thing else would
531                                // indicate a logical error somewhere along the way; either we release a capability
532                                // we should have kept, or we have computed the output incorrectly (or both!)
533                                let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
534                                let idx = outputs.len() - idx.expect("failed to find index") - 1;
535                                for (val, diff) in self.update_buffer.drain(..) {
536                                    self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
537                                    outputs[idx].1.push((val, next_time.clone(), diff));
538                                }
539
540                                // Advance times in `self.output_produced` and consolidate the representation.
541                                // NOTE: We only do this when we add records; it could be that there are situations
542                                //       where we want to consolidate even without changes (because an initially
543                                //       large collection can now be collapsed).
544                                if let Some(meet) = meet.as_ref() { for entry in &mut self.output_produced { (entry.0).1.join_assign(meet); } }
545                                crate::consolidation::consolidate(&mut self.output_produced);
546                            }
547                        }
548                    }
549
550                    // Determine synthetic interesting times.
551                    //
552                    // Synthetic interesting times are produced differently for interesting and uninteresting
553                    // times. An uninteresting time must join with an interesting time to become interesting,
554                    // which means joins with `self.batch_history` and  `self.times_current`. I think we can
555                    // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
556                    // joined against everything.
557
558                    // Any time, even uninteresting times, must be joined with the current accumulation of
559                    // batch times as well as the current accumulation of `times_current`.
560                    self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
561                    self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
562                    sort_dedup(&mut self.temporary);
563
564                    // Introduce synthetic times, and re-organize if we add any.
565                    let synth_len = self.synth_times.len();
566                    for time in self.temporary.drain(..) {
567                        // We can either service `join` now, or must delay for the future.
568                        if upper_limit.less_equal(&time) {
569                            debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
570                            new_interesting.push(time);
571                        }
572                        else {
573                            self.synth_times.push(time);
574                        }
575                    }
576                    if self.synth_times.len() > synth_len {
577                        self.synth_times.sort_by(|x,y| y.cmp(x));
578                        self.synth_times.dedup();
579                    }
580                }
581                else if interesting {
582                    // We cannot process `next_time` now, and must delay it.
583                    //
584                    // I think we are probably only here because of an uninteresting time declared interesting,
585                    // as initial interesting times are filtered to be in interval, and synthetic times are also
586                    // filtered before introducing them to `self.synth_times`.
587                    new_interesting.push(next_time.clone());
588                    debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
589                }
590
591                // Update `meet` to track the meet of each source of times.
592                meet = None;
593                update_meet(&mut meet, batch_replay.meet());
594                update_meet(&mut meet, input_replay.meet());
595                update_meet(&mut meet, output_replay.meet());
596                for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
597                update_meet(&mut meet, meets_slice.first());
598
599                // Update `times_current` by the frontier.
600                if let Some(meet) = meet.as_ref() {
601                    for time in self.times_current.iter_mut() {
602                        *time = time.join(meet);
603                    }
604                }
605
606                sort_dedup(&mut self.times_current);
607            }
608
609            // Normalize the representation of `new_interesting`, deduplicating and ordering.
610            sort_dedup(new_interesting);
611        }
612    }
613
614    /// Updates an optional meet by an optional time.
615    fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
616        if let Some(time) = other {
617            if let Some(meet) = meet.as_mut() { meet.meet_assign(time); }
618            else { *meet = Some(time.clone()); }
619        }
620    }
621}