Skip to main content

differential_dataflow/operators/
reduce.rs

1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use crate::Data;
9
10use timely::progress::frontier::Antichain;
11use timely::progress::Timestamp;
12use timely::dataflow::operators::Operator;
13use timely::dataflow::channels::pact::Pipeline;
14
15use crate::operators::arrange::{Arranged, TraceAgent};
16use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
17use crate::trace::cursor::CursorList;
18use crate::trace::implementations::containers::BatchContainer;
19use crate::trace::TraceReader;
20
21/// A key-wise reduction of values in an input trace.
22///
23/// This method exists to provide reduce functionality without opinions about qualifying trace types.
24///
25/// The `logic` closure is expected to take a key, accumulated input, and tentative accumulated output,
26/// and populate its final argument with whatever it feels to be appopriate updates. The behavior and
27/// correctness of the implementation rely on this making sense, and e.g. ideally the updates would if
28/// applied to the tentative output bring it in line with some function applied to the input.
29///
30/// The `push` closure is expected to clear its first argument, then populate it with the key and drain
31/// the value updates, as appropriate for the container. It is critical that it clear the container as
32/// the operator has no ability to do this otherwise, and failing to do so represents a leak from one
33/// key's computation to another, and will likely introduce non-determinism.
34pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent<Tr2>>
35where
36    Tr1: TraceReader + Clone + 'static,
37    Tr2: for<'a> Trace<Key<'a>=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static,
38    Bu: Builder<Time=Tr2::Time, Output = Tr2::Batch, Input: Default>,
39    L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
40    P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
41{
42    let mut result_trace = None;
43
44    // fabricate a data-parallel operator using the `unary_notify` pattern.
45    let stream = {
46
47        let result_trace = &mut result_trace;
48        let scope = trace.stream.scope();
49        trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
50
51            // Acquire a logger for arrange events.
52            let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
53
54            let activator = Some(scope.activator_for(operator_info.address.clone()));
55            let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator);
56            // If there is default exert logic set, install it.
57            if let Some(exert_logic) = scope.worker().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
58                empty.set_exert_logic(exert_logic);
59            }
60
61            let mut source_trace = trace.trace.clone();
62
63            let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
64
65            *result_trace = Some(output_reader.clone());
66
67            let mut new_interesting_times = Vec::<Tr1::Time>::new();
68
69            // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
70            // sorted by (key, time), as well as capabilities for the lower envelope of the times.
71            let mut pending_keys = Tr1::KeyContainer::with_capacity(0);
72            let mut pending_time = Tr1::TimeContainer::with_capacity(0);
73            let mut next_pending_keys = Tr1::KeyContainer::with_capacity(0);
74            let mut next_pending_time = Tr1::TimeContainer::with_capacity(0);
75            let mut capabilities = timely::dataflow::operators::CapabilitySet::<Tr1::Time>::default();
76
77            // buffers and logic for computing per-key interesting times "efficiently".
78            let mut interesting_times = Vec::<Tr1::Time>::new();
79
80            // Upper and lower frontiers for the pending input and output batches to process.
81            let mut upper_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
82            let mut lower_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
83
84            // Output batches may need to be built piecemeal, and these temp storage help there.
85            let mut output_upper = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
86            let mut output_lower = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
87
88            move |(input, frontier), output| {
89
90                // The operator receives input batches, which it treats as contiguous and will collect and
91                // then process as one batch. It captures the input frontier from the batches, from the upstream
92                // trace, and from the input frontier, and retires the work through that interval.
93                //
94                // Reduce may retain capabilities and need to perform work and produce output at times that
95                // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)`
96                // may result in outputs at `(1, 1)` as well, even with no input at that time.
97
98                let mut batch_cursors = Vec::new();
99                let mut batch_storage = Vec::new();
100
101                // Downgrade previous upper limit to be current lower limit.
102                lower_limit.clear();
103                lower_limit.extend(upper_limit.borrow().iter().cloned());
104
105                // Drain input batches in order, capturing capabilities and the last upper.
106                input.for_each(|capability, batches| {
107                    capabilities.insert(capability.retain(0));
108                    for batch in batches.drain(..) {
109                        upper_limit.clone_from(batch.upper());
110                        batch_cursors.push(batch.cursor());
111                        batch_storage.push(batch);
112                    }
113                });
114
115                // Pull in any subsequent empty batches we believe to exist.
116                source_trace.advance_upper(&mut upper_limit);
117                // Incorporate the input frontier guarantees as well.
118                let mut joined = Antichain::new();
119                crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined);
120                upper_limit = joined;
121
122                // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed.
123                if upper_limit != lower_limit {
124
125                    // If we hold no capabilities in the interval [lower_limit, upper_limit) then we have no compute needs,
126                    // and could not transmit the outputs even if they were (incorrectly) non-zero.
127                    // We do have maintenance work after this logic, and should not fuse this test with the above test.
128                    if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
129
130                        // cursors for navigating input and output traces.
131                        let (mut source_cursor, ref source_storage): (Tr1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
132                        let (mut output_cursor, ref output_storage): (Tr2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
133                        let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
134
135                        // Prepare an output buffer and builder for each capability.
136                        // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
137                        //       this as long as it requires that there is only one capability for each message.
138                        let mut buffers = Vec::<(Tr1::Time, Vec<(Tr2::ValOwn, Tr1::Time, Tr2::Diff)>)>::new();
139                        let mut builders = Vec::new();
140                        for cap in capabilities.iter() {
141                            buffers.push((cap.time().clone(), Vec::new()));
142                            builders.push(Bu::new());
143                        }
144                        // Temporary staging for output building.
145                        let mut buffer = Bu::Input::default();
146
147                        // Reuseable state for performing the computation.
148                        let mut thinker = history_replay::HistoryReplayer::new();
149
150                        // Merge the received batch cursor with our list of interesting (key, time) moments.
151                        // The interesting moments need to be in the interval to prompt work.
152
153                        // March through the keys we must work on, merging `batch_cursors` and `exposed`.
154                        let mut pending_pos = 0;
155                        while batch_cursor.key_valid(batch_storage) || pending_pos < pending_keys.len() {
156
157                            // Determine the next key we will work on; could be synthetic, could be from a batch.
158                            let key1 = pending_keys.get(pending_pos);
159                            let key2 = batch_cursor.get_key(batch_storage);
160                            let key = match (key1, key2) {
161                                (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
162                                (Some(key1), None)       => key1,
163                                (None, Some(key2))       => key2,
164                                (None, None)             => unreachable!(),
165                            };
166
167                            // Populate `interesting_times` with interesting times not beyond `upper_limit`.
168                            // TODO: This could just be `pending_time` and indexes within `lower .. upper`.
169                            let prior_pos = pending_pos;
170                            interesting_times.clear();
171                            while pending_keys.get(pending_pos) == Some(key) {
172                                let owned_time = Tr1::owned_time(pending_time.index(pending_pos));
173                                if !upper_limit.less_equal(&owned_time) { interesting_times.push(owned_time); }
174                                pending_pos += 1;
175                            }
176
177                            // tidy up times, removing redundancy.
178                            sort_dedup(&mut interesting_times);
179
180                            // If there are new updates, or pending times, we must investigate!
181                            if batch_cursor.get_key(batch_storage) == Some(key) || !interesting_times.is_empty() {
182
183                                // do the per-key computation.
184                                thinker.compute(
185                                    key,
186                                    (&mut source_cursor, source_storage),
187                                    (&mut output_cursor, output_storage),
188                                    (&mut batch_cursor, batch_storage),
189                                    &interesting_times,
190                                    &mut logic,
191                                    &upper_limit,
192                                    &mut buffers[..],
193                                    &mut new_interesting_times,
194                                );
195
196                                // Advance the cursor if this key, so that the loop's validity check registers the work as done.
197                                if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); }
198
199                                // Merge novel pending times with any prior pending times we did not process.
200                                // TODO: This could be a merge, not a sort_dedup, because both lists should be sorted.
201                                for pos in prior_pos .. pending_pos {
202                                    let owned_time = Tr1::owned_time(pending_time.index(pos));
203                                    if upper_limit.less_equal(&owned_time) { new_interesting_times.push(owned_time); }
204                                }
205                                sort_dedup(&mut new_interesting_times);
206                                for time in new_interesting_times.drain(..) {
207                                    next_pending_keys.push_ref(key);
208                                    next_pending_time.push_own(&time);
209                                }
210
211                                // Sort each buffer by value and move into the corresponding builder.
212                                // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
213                                //       (ii) that the buffers are time-ordered, and (iii) that the builders accept
214                                //       arbitrarily ordered times.
215                                for index in 0 .. buffers.len() {
216                                    buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
217                                    push(&mut buffer, key, &mut buffers[index].1);
218                                    buffers[index].1.clear();
219                                    builders[index].push(&mut buffer);
220
221                                }
222                            }
223                            else {
224                                // copy over the pending key and times.
225                                for pos in prior_pos .. pending_pos {
226                                    next_pending_keys.push_ref(pending_keys.index(pos));
227                                    next_pending_time.push_ref(pending_time.index(pos));
228                                }
229                            }
230                        }
231                        // Drop to avoid lifetime issues that would lock `pending_{keys, time}`.
232                        drop(thinker);
233
234                        // We start sealing output batches from the lower limit (previous upper limit).
235                        // In principle, we could update `lower_limit` itself, and it should arrive at
236                        // `upper_limit` by the end of the process.
237                        output_lower.clear();
238                        output_lower.extend(lower_limit.borrow().iter().cloned());
239
240                        // build and ship each batch (because only one capability per message).
241                        for (index, builder) in builders.drain(..).enumerate() {
242
243                            // Form the upper limit of the next batch, which includes all times greater
244                            // than the input batch, or the capabilities from i + 1 onward.
245                            output_upper.clear();
246                            output_upper.extend(upper_limit.borrow().iter().cloned());
247                            for capability in &capabilities[index + 1 ..] {
248                                output_upper.insert_ref(capability.time());
249                            }
250
251                            if output_upper.borrow() != output_lower.borrow() {
252
253                                let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(Tr1::Time::minimum()));
254                                let batch = builder.done(description);
255
256                                // ship batch to the output, and commit to the output trace.
257                                output.session(&capabilities[index]).give(batch.clone());
258                                output_writer.insert(batch, Some(capabilities[index].time().clone()));
259
260                                output_lower.clear();
261                                output_lower.extend(output_upper.borrow().iter().cloned());
262                            }
263                        }
264                        // This should be true, as the final iteration introduces no capabilities, and
265                        // uses exactly `upper_limit` to determine the upper bound. Good to check though.
266                        assert!(output_upper.borrow() == upper_limit.borrow());
267
268                        // Refresh pending keys and times, then downgrade capabilities to the frontier of times.
269                        pending_keys.clear(); std::mem::swap(&mut next_pending_keys, &mut pending_keys);
270                        pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);
271
272                        // Update `capabilities` to reflect pending times.
273                        let mut frontier = Antichain::<Tr1::Time>::new();
274                        let mut owned_time = Tr1::Time::minimum();
275                        for pos in 0 .. pending_time.len() {
276                            Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
277                            frontier.insert_ref(&owned_time);
278                        }
279                        capabilities.downgrade(frontier);
280                    }
281
282                    // ensure that observed progress is reflected in the output.
283                    output_writer.seal(upper_limit.clone());
284
285                    // We only anticipate future times in advance of `upper_limit`.
286                    source_trace.set_logical_compaction(upper_limit.borrow());
287                    output_reader.set_logical_compaction(upper_limit.borrow());
288
289                    // We will only slice the data between future batches.
290                    source_trace.set_physical_compaction(upper_limit.borrow());
291                    output_reader.set_physical_compaction(upper_limit.borrow());
292                }
293
294                // Exert trace maintenance if we have been so requested.
295                output_writer.exert();
296            }
297        }
298    )
299    };
300
301    Arranged { stream, trace: result_trace.unwrap() }
302}
303
304
305#[inline(never)]
306fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
307    list.dedup();
308    list.sort();
309    list.dedup();
310}
311
312/// Implementation based on replaying historical and new updates together.
313mod history_replay {
314
315    use timely::progress::Antichain;
316    use timely::PartialOrder;
317
318    use crate::lattice::Lattice;
319    use crate::trace::Cursor;
320    use crate::operators::ValueHistory;
321
322    use super::sort_dedup;
323
324    /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
325    /// time order, maintaining consolidated representations of updates with respect to future interesting times.
326    pub struct HistoryReplayer<'a, C1, C2, C3, V>
327    where
328        C1: Cursor,
329        C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
330        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
331        V: Clone + Ord,
332    {
333        input_history: ValueHistory<'a, C1>,
334        output_history: ValueHistory<'a, C2>,
335        batch_history: ValueHistory<'a, C3>,
336        input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
337        output_buffer: Vec<(V, C2::Diff)>,
338        update_buffer: Vec<(V, C2::Diff)>,
339        output_produced: Vec<((V, C2::Time), C2::Diff)>,
340        synth_times: Vec<C1::Time>,
341        meets: Vec<C1::Time>,
342        times_current: Vec<C1::Time>,
343        temporary: Vec<C1::Time>,
344    }
345
346    impl<'a, C1, C2, C3, V> HistoryReplayer<'a, C1, C2, C3, V>
347    where
348        C1: Cursor,
349        C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
350        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
351        V: Clone + Ord,
352    {
353        pub fn new() -> Self {
354            HistoryReplayer {
355                input_history: ValueHistory::new(),
356                output_history: ValueHistory::new(),
357                batch_history: ValueHistory::new(),
358                input_buffer: Vec::new(),
359                output_buffer: Vec::new(),
360                update_buffer: Vec::new(),
361                output_produced: Vec::new(),
362                synth_times: Vec::new(),
363                meets: Vec::new(),
364                times_current: Vec::new(),
365                temporary: Vec::new(),
366            }
367        }
368        #[inline(never)]
369        pub fn compute<L>(
370            &mut self,
371            key: C1::Key<'a>,
372            (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
373            (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
374            (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
375            times: &Vec<C1::Time>,
376            logic: &mut L,
377            upper_limit: &Antichain<C1::Time>,
378            outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
379            new_interesting: &mut Vec<C1::Time>)
380        where
381            L: FnMut(
382                C1::Key<'a>,
383                &[(C1::Val<'a>, C1::Diff)],
384                &mut Vec<(V, C2::Diff)>,
385                &mut Vec<(V, C2::Diff)>,
386            )
387        {
388
389            // The work we need to perform is at times defined principally by the contents of `batch_cursor`
390            // and `times`, respectively "new work we just received" and "old times we were warned about".
391            //
392            // Our first step is to identify these times, so that we can use them to restrict the amount of
393            // information we need to recover from `input` and `output`; as all times of interest will have
394            // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
395            // loaded times by performing the lattice `join` with this value.
396
397            // Load the batch contents.
398            let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time));
399
400            // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
401            // can be used to advance other historical times, which may consolidate their representation. As
402            // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
403            // history forward.
404
405            self.meets.clear();
406            self.meets.extend(times.iter().cloned());
407            for index in (1 .. self.meets.len()).rev() {
408                self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
409            }
410
411            // Determine the meet of times in `batch` and `times`.
412            let mut meet = None;
413            update_meet(&mut meet, self.meets.get(0));
414            update_meet(&mut meet, batch_replay.meet());
415
416            // Having determined the meet, we can load the input and output histories, where we
417            // advance all times by joining them with `meet`. The resulting times are more compact
418            // and guaranteed to accumulate identically for times greater or equal to `meet`.
419
420            // Load the input and output histories.
421            let mut input_replay =
422            self.input_history.replay_key(source_cursor, source_storage, key, |time| {
423                let mut time = C1::owned_time(time);
424                if let Some(meet) = meet.as_ref() { time.join_assign(meet); }
425                time
426            });
427            let mut output_replay =
428            self.output_history.replay_key(output_cursor, output_storage, key, |time| {
429                let mut time = C2::owned_time(time);
430                if let Some(meet) = meet.as_ref() { time.join_assign(meet); }
431                time
432            });
433
434            self.synth_times.clear();
435            self.times_current.clear();
436            self.output_produced.clear();
437
438            // The frontier of times we may still consider.
439            // Derived from frontiers of our update histories, supplied times, and synthetic times.
440
441            let mut times_slice = &times[..];
442            let mut meets_slice = &self.meets[..];
443
444            // We have candidate times from `batch` and `times`, as well as times identified by either
445            // `input` or `output`. Finally, we may have synthetic times produced as the join of times
446            // we consider in the course of evaluation. As long as any of these times exist, we need to
447            // keep examining times.
448            while let Some(next_time) = [   batch_replay.time(),
449                                            times_slice.first(),
450                                            input_replay.time(),
451                                            output_replay.time(),
452                                            self.synth_times.last(),
453                                        ].into_iter().flatten().min().cloned() {
454
455                // Advance input and output history replayers. This marks applicable updates as active.
456                input_replay.step_while_time_is(&next_time);
457                output_replay.step_while_time_is(&next_time);
458
459                // One of our goals is to determine if `next_time` is "interesting", meaning whether we
460                // have any evidence that we should re-evaluate the user logic at this time. For a time
461                // to be "interesting" it would need to be the join of times that include either a time
462                // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
463
464                // Advance batch history, and capture whether an update exists at `next_time`.
465                let mut interesting = batch_replay.step_while_time_is(&next_time);
466                if interesting { if let Some(meet) = meet.as_ref() { batch_replay.advance_buffer_by(meet); } }
467
468                // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
469                while self.synth_times.last() == Some(&next_time) {
470                    // We don't know enough about `next_time` to avoid putting it in to `times_current`.
471                    // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
472                    self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
473                    interesting = true;
474                }
475                while times_slice.first() == Some(&next_time) {
476                    // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
477                    self.times_current.push(times_slice[0].clone());
478                    times_slice = &times_slice[1..];
479                    meets_slice = &meets_slice[1..];
480                    interesting = true;
481                }
482
483                // Times could also be interesting if an interesting time is less than them, as they would join
484                // and become the time itself. They may not equal the current time because whatever frontier we
485                // are tracking may not have advanced far enough.
486                // TODO: `batch_history` may or may not be super compact at this point, and so this check might
487                //       yield false positives if not sufficiently compact. Maybe we should look into this and see.
488                interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
489                interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
490
491                // We should only process times that are not in advance of `upper_limit`.
492                //
493                // We have no particular guarantee that known times will not be in advance of `upper_limit`.
494                // We may have the guarantee that synthetic times will not be, as we test against the limit
495                // before we add the time to `synth_times`.
496                if !upper_limit.less_equal(&next_time) {
497
498                    // We should re-evaluate the computation if this is an interesting time.
499                    // If the time is uninteresting (and our logic is sound) it is not possible for there to be
500                    // output produced. This sounds like a good test to have for debug builds!
501                    if interesting {
502
503                        // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
504                        debug_assert!(self.input_buffer.is_empty());
505                        if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) };
506                        for ((value, time), diff) in input_replay.buffer().iter() {
507                            if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
508                            else { self.temporary.push(next_time.join(time)); }
509                        }
510                        for ((value, time), diff) in batch_replay.buffer().iter() {
511                            if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
512                            else { self.temporary.push(next_time.join(time)); }
513                        }
514                        crate::consolidation::consolidate(&mut self.input_buffer);
515
516                        // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use).
517                        if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) };
518                        for ((value, time), diff) in output_replay.buffer().iter() {
519                            if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); }
520                            else { self.temporary.push(next_time.join(time)); }
521                        }
522                        for ((value, time), diff) in self.output_produced.iter() {
523                            if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); }
524                            else { self.temporary.push(next_time.join(time)); }
525                        }
526                        crate::consolidation::consolidate(&mut self.output_buffer);
527
528                        // Apply user logic if non-empty input or output and see what happens!
529                        if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
530                            logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
531                            self.input_buffer.clear();
532                            self.output_buffer.clear();
533
534                            // Having subtracted output updates from user output, consolidate the results to determine
535                            // if there is anything worth reporting. Note: this also orders the results by value, so
536                            // that could make the above merging plan even easier.
537                            //
538                            // Stash produced updates into both capability-indexed buffers and `output_produced`.
539                            // The two locations are important, in that we will compact `output_produced` as we move
540                            // through times, but we cannot compact the output buffers because we need their actual
541                            // times.
542                            crate::consolidation::consolidate(&mut self.update_buffer);
543                            if !self.update_buffer.is_empty() {
544
545                                // We *should* be able to find a capability for `next_time`. Any thing else would
546                                // indicate a logical error somewhere along the way; either we release a capability
547                                // we should have kept, or we have computed the output incorrectly (or both!)
548                                let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
549                                let idx = outputs.len() - idx.expect("failed to find index") - 1;
550                                for (val, diff) in self.update_buffer.drain(..) {
551                                    self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
552                                    outputs[idx].1.push((val, next_time.clone(), diff));
553                                }
554
555                                // Advance times in `self.output_produced` and consolidate the representation.
556                                // NOTE: We only do this when we add records; it could be that there are situations
557                                //       where we want to consolidate even without changes (because an initially
558                                //       large collection can now be collapsed).
559                                if let Some(meet) = meet.as_ref() { for entry in &mut self.output_produced { (entry.0).1.join_assign(meet); } }
560                                crate::consolidation::consolidate(&mut self.output_produced);
561                            }
562                        }
563                    }
564
565                    // Determine synthetic interesting times.
566                    //
567                    // Synthetic interesting times are produced differently for interesting and uninteresting
568                    // times. An uninteresting time must join with an interesting time to become interesting,
569                    // which means joins with `self.batch_history` and  `self.times_current`. I think we can
570                    // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
571                    // joined against everything.
572
573                    // Any time, even uninteresting times, must be joined with the current accumulation of
574                    // batch times as well as the current accumulation of `times_current`.
575                    self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
576                    self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
577                    sort_dedup(&mut self.temporary);
578
579                    // Introduce synthetic times, and re-organize if we add any.
580                    let synth_len = self.synth_times.len();
581                    for time in self.temporary.drain(..) {
582                        // We can either service `join` now, or must delay for the future.
583                        if upper_limit.less_equal(&time) {
584                            debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
585                            new_interesting.push(time);
586                        }
587                        else {
588                            self.synth_times.push(time);
589                        }
590                    }
591                    if self.synth_times.len() > synth_len {
592                        self.synth_times.sort_by(|x,y| y.cmp(x));
593                        self.synth_times.dedup();
594                    }
595                }
596                else if interesting {
597                    // We cannot process `next_time` now, and must delay it.
598                    //
599                    // I think we are probably only here because of an uninteresting time declared interesting,
600                    // as initial interesting times are filtered to be in interval, and synthetic times are also
601                    // filtered before introducing them to `self.synth_times`.
602                    new_interesting.push(next_time.clone());
603                    debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
604                }
605
606                // Update `meet` to track the meet of each source of times.
607                meet = None;
608                update_meet(&mut meet, batch_replay.meet());
609                update_meet(&mut meet, input_replay.meet());
610                update_meet(&mut meet, output_replay.meet());
611                for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
612                update_meet(&mut meet, meets_slice.first());
613
614                // Update `times_current` by the frontier.
615                if let Some(meet) = meet.as_ref() {
616                    for time in self.times_current.iter_mut() {
617                        *time = time.join(meet);
618                    }
619                }
620
621                sort_dedup(&mut self.times_current);
622            }
623
624            // Normalize the representation of `new_interesting`, deduplicating and ordering.
625            sort_dedup(new_interesting);
626        }
627    }
628
629    /// Updates an optional meet by an optional time.
630    fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
631        if let Some(time) = other {
632            if let Some(meet) = meet.as_mut() { meet.meet_assign(time); }
633            else { *meet = Some(time.clone()); }
634        }
635    }
636}