Skip to main content

palimpsest_dataflow/operators/
reduce.rs

1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use crate::difference::{Abelian, Semigroup};
9use crate::hashable::Hashable;
10use crate::{Data, ExchangeData, VecCollection};
11use timely::container::PushInto;
12
13use timely::dataflow::channels::pact::Pipeline;
14use timely::dataflow::operators::Capability;
15use timely::dataflow::operators::Operator;
16use timely::dataflow::*;
17use timely::order::PartialOrder;
18use timely::progress::frontier::Antichain;
19use timely::progress::Timestamp;
20
21use crate::lattice::Lattice;
22use crate::operators::arrange::{ArrangeByKey, ArrangeBySelf, Arranged, TraceAgent};
23use crate::trace::cursor::CursorList;
24use crate::trace::implementations::containers::BatchContainer;
25use crate::trace::implementations::merge_batcher::container::MergerChunk;
26use crate::trace::implementations::{KeyBuilder, KeySpine, ValBuilder, ValSpine};
27use crate::trace::TraceReader;
28use crate::trace::{BatchReader, Builder, Cursor, Description, ExertionLogic, Trace};
29
30/// Extension trait for the `reduce` differential dataflow method.
31pub trait Reduce<G: Scope<Timestamp: Lattice + Ord>, K: Data, V: Data, R: Semigroup> {
32    /// Applies a reduction function on records grouped by key.
33    ///
34    /// Input data must be structured as `(key, val)` pairs.
35    /// The user-supplied reduction function takes as arguments
36    ///
37    /// 1. a reference to the key,
38    /// 2. a reference to the slice of values and their accumulated updates,
39    /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
40    ///
41    /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
42    /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
43    /// `Ord` implementations.
44    ///
45    /// # Examples
46    ///
47    /// ```
48    /// use palimpsest_dataflow::input::Input;
49    /// use palimpsest_dataflow::operators::Reduce;
50    ///
51    /// ::timely::example(|scope| {
52    ///     // report the smallest value for each group
53    ///     scope.new_collection_from(1 .. 10).1
54    ///          .map(|x| (x / 3, x))
55    ///          .reduce(|_key, input, output| {
56    ///              output.push((*input[0].0, 1))
57    ///          });
58    /// });
59    /// ```
60    fn reduce<L, V2: Data, R2: Ord + Abelian + 'static>(
61        &self,
62        logic: L,
63    ) -> VecCollection<G, (K, V2), R2>
64    where
65        L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,
66    {
67        self.reduce_named("Reduce", logic)
68    }
69
70    /// As `reduce` with the ability to name the operator.
71    fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>(
72        &self,
73        name: &str,
74        logic: L,
75    ) -> VecCollection<G, (K, V2), R2>
76    where
77        L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static;
78}
79
80impl<G, K, V, R> Reduce<G, K, V, R> for VecCollection<G, (K, V), R>
81where
82    G: Scope<Timestamp: Lattice + Ord>,
83    K: ExchangeData + Hashable,
84    V: ExchangeData,
85    R: ExchangeData + Semigroup,
86{
87    fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>(
88        &self,
89        name: &str,
90        logic: L,
91    ) -> VecCollection<G, (K, V2), R2>
92    where
93        L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,
94    {
95        self.arrange_by_key_named(&format!("Arrange: {}", name))
96            .reduce_named(name, logic)
97    }
98}
99
100impl<G, K: Data, V: Data, T1, R: Ord + Semigroup + 'static> Reduce<G, K, V, R> for Arranged<G, T1>
101where
102    G: Scope<Timestamp = T1::Time>,
103    T1: for<'a> TraceReader<Key<'a> = &'a K, KeyOwn = K, Val<'a> = &'a V, Diff = R>
104        + Clone
105        + 'static,
106{
107    fn reduce_named<L, V2: Data, R2: Ord + Abelian + 'static>(
108        &self,
109        name: &str,
110        logic: L,
111    ) -> VecCollection<G, (K, V2), R2>
112    where
113        L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>) + 'static,
114    {
115        self.reduce_abelian::<_, ValBuilder<_, _, _, _>, ValSpine<K, V2, _, _>>(name, logic)
116            .as_collection(|k, v| (k.clone(), v.clone()))
117    }
118}
119
120/// Extension trait for the `threshold` and `distinct` differential dataflow methods.
121pub trait Threshold<G: Scope<Timestamp: Lattice + Ord>, K: Data, R1: Semigroup> {
122    /// Transforms the multiplicity of records.
123    ///
124    /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
125    /// least the computation may behave as if it does. Otherwise, the transformation
126    /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
127    ///
128    /// # Examples
129    ///
130    /// ```
131    /// use palimpsest_dataflow::input::Input;
132    /// use palimpsest_dataflow::operators::Threshold;
133    ///
134    /// ::timely::example(|scope| {
135    ///     // report at most one of each key.
136    ///     scope.new_collection_from(1 .. 10).1
137    ///          .map(|x| x / 3)
138    ///          .threshold(|_,c| c % 2);
139    /// });
140    /// ```
141    fn threshold<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
142        &self,
143        thresh: F,
144    ) -> VecCollection<G, K, R2> {
145        self.threshold_named("Threshold", thresh)
146    }
147
148    /// A `threshold` with the ability to name the operator.
149    fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
150        &self,
151        name: &str,
152        thresh: F,
153    ) -> VecCollection<G, K, R2>;
154
155    /// Reduces the collection to one occurrence of each distinct element.
156    ///
157    /// # Examples
158    ///
159    /// ```
160    /// use palimpsest_dataflow::input::Input;
161    /// use palimpsest_dataflow::operators::Threshold;
162    ///
163    /// ::timely::example(|scope| {
164    ///     // report at most one of each key.
165    ///     scope.new_collection_from(1 .. 10).1
166    ///          .map(|x| x / 3)
167    ///          .distinct();
168    /// });
169    /// ```
170    fn distinct(&self) -> VecCollection<G, K, isize> {
171        self.distinct_core()
172    }
173
174    /// Distinct for general integer differences.
175    ///
176    /// This method allows `distinct` to produce collections whose difference
177    /// type is something other than an `isize` integer, for example perhaps an
178    /// `i32`.
179    fn distinct_core<R2: Ord + Abelian + 'static + From<i8>>(&self) -> VecCollection<G, K, R2> {
180        self.threshold_named("Distinct", |_, _| R2::from(1i8))
181    }
182}
183
184impl<
185        G: Scope<Timestamp: Lattice + Ord>,
186        K: ExchangeData + Hashable,
187        R1: ExchangeData + Semigroup,
188    > Threshold<G, K, R1> for VecCollection<G, K, R1>
189{
190    fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
191        &self,
192        name: &str,
193        thresh: F,
194    ) -> VecCollection<G, K, R2> {
195        self.arrange_by_self_named(&format!("Arrange: {}", name))
196            .threshold_named(name, thresh)
197    }
198}
199
200impl<G, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
201where
202    G: Scope<Timestamp = T1::Time>,
203    T1: for<'a> TraceReader<Key<'a> = &'a K, KeyOwn = K, Val<'a> = &'a (), Diff = R1>
204        + Clone
205        + 'static,
206{
207    fn threshold_named<R2: Ord + Abelian + 'static, F: FnMut(&K, &R1) -> R2 + 'static>(
208        &self,
209        name: &str,
210        mut thresh: F,
211    ) -> VecCollection<G, K, R2> {
212        self.reduce_abelian::<_, KeyBuilder<K, G::Timestamp, R2>, KeySpine<K, G::Timestamp, R2>>(
213            name,
214            move |k, s, t| t.push(((), thresh(k, &s[0].1))),
215        )
216        .as_collection(|k, _| k.clone())
217    }
218}
219
220/// Extension trait for the `count` differential dataflow method.
221pub trait Count<G: Scope<Timestamp: Lattice + Ord>, K: Data, R: Semigroup> {
222    /// Counts the number of occurrences of each element.
223    ///
224    /// # Examples
225    ///
226    /// ```
227    /// use palimpsest_dataflow::input::Input;
228    /// use palimpsest_dataflow::operators::Count;
229    ///
230    /// ::timely::example(|scope| {
231    ///     // report the number of occurrences of each key
232    ///     scope.new_collection_from(1 .. 10).1
233    ///          .map(|x| x / 3)
234    ///          .count();
235    /// });
236    /// ```
237    fn count(&self) -> VecCollection<G, (K, R), isize> {
238        self.count_core()
239    }
240
241    /// Count for general integer differences.
242    ///
243    /// This method allows `count` to produce collections whose difference
244    /// type is something other than an `isize` integer, for example perhaps an
245    /// `i32`.
246    fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2>;
247}
248
249impl<
250        G: Scope<Timestamp: Lattice + Ord>,
251        K: ExchangeData + Hashable,
252        R: ExchangeData + Semigroup,
253    > Count<G, K, R> for VecCollection<G, K, R>
254{
255    fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2> {
256        self.arrange_by_self_named("Arrange: Count").count_core()
257    }
258}
259
260impl<G, K: Data, T1, R: Data + Semigroup> Count<G, K, R> for Arranged<G, T1>
261where
262    G: Scope<Timestamp = T1::Time>,
263    T1: for<'a> TraceReader<Key<'a> = &'a K, KeyOwn = K, Val<'a> = &'a (), Diff = R>
264        + Clone
265        + 'static,
266{
267    fn count_core<R2: Ord + Abelian + From<i8> + 'static>(&self) -> VecCollection<G, (K, R), R2> {
268        self.reduce_abelian::<_,ValBuilder<K,R,G::Timestamp,R2>,ValSpine<K,R,G::Timestamp,R2>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
269            .as_collection(|k,c| (k.clone(), c.clone()))
270    }
271}
272
273/// Extension trait for the `reduce_core` differential dataflow method.
274pub trait ReduceCore<G: Scope<Timestamp: Lattice + Ord>, K: ToOwned + ?Sized, V: Data, R: Semigroup>
275{
276    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
277    ///
278    /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
279    /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
280    ///
281    /// # Examples
282    ///
283    /// ```
284    /// use palimpsest_dataflow::input::Input;
285    /// use palimpsest_dataflow::operators::reduce::ReduceCore;
286    /// use palimpsest_dataflow::trace::Trace;
287    /// use palimpsest_dataflow::trace::implementations::{ValBuilder, ValSpine};
288    ///
289    /// ::timely::example(|scope| {
290    ///
291    ///     let trace =
292    ///     scope.new_collection_from(1 .. 10u32).1
293    ///          .map(|x| (x, x))
294    ///          .reduce_abelian::<_,ValBuilder<_,_,_,_>,ValSpine<_,_,_,_>>(
295    ///             "Example",
296    ///              move |_key, src, dst| dst.push((*src[0].0, 1))
297    ///          )
298    ///          .trace;
299    /// });
300    /// ```
301    fn reduce_abelian<L, Bu, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
302    where
303        T2: for<'a> Trace<
304                Key<'a> = &'a K,
305                KeyOwn = K,
306                ValOwn = V,
307                Time = G::Timestamp,
308                Diff: Abelian,
309            > + 'static,
310        Bu: Builder<
311            Time = T2::Time,
312            Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>,
313            Output = T2::Batch,
314        >,
315        L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>) + 'static,
316    {
317        self.reduce_core::<_, Bu, T2>(name, move |key, input, output, change| {
318            if !input.is_empty() {
319                logic(key, input, change);
320            }
321            change.extend(output.drain(..).map(|(x, mut d)| {
322                d.negate();
323                (x, d)
324            }));
325            crate::consolidation::consolidate(change);
326        })
327    }
328
329    /// Solves for output updates when presented with inputs and would-be outputs.
330    ///
331    /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
332    /// and it may not be safe to index into the first element.
333    /// At least one of the two collections will be non-empty.
334    fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
335    where
336        T2: for<'a> Trace<Key<'a> = &'a K, KeyOwn = K, ValOwn = V, Time = G::Timestamp> + 'static,
337        Bu: Builder<
338            Time = T2::Time,
339            Input = Vec<((K::Owned, V), T2::Time, T2::Diff)>,
340            Output = T2::Batch,
341        >,
342        L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>) + 'static;
343}
344
345impl<G, K, V, R> ReduceCore<G, K, V, R> for VecCollection<G, (K, V), R>
346where
347    G: Scope,
348    G::Timestamp: Lattice + Ord,
349    K: ExchangeData + Hashable,
350    V: ExchangeData,
351    R: ExchangeData + Semigroup,
352{
353    fn reduce_core<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
354    where
355        V: Data,
356        T2: for<'a> Trace<Key<'a> = &'a K, KeyOwn = K, ValOwn = V, Time = G::Timestamp> + 'static,
357        Bu: Builder<Time = T2::Time, Input = Vec<((K, V), T2::Time, T2::Diff)>, Output = T2::Batch>,
358        L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>) + 'static,
359    {
360        self.arrange_by_key_named(&format!("Arrange: {}", name))
361            .reduce_core::<_, Bu, _>(name, logic)
362    }
363}
364
365/// A key-wise reduction of values in an input trace.
366///
367/// This method exists to provide reduce functionality without opinions about qualifying trace types.
368pub fn reduce_trace<G, T1, Bu, T2, L>(
369    trace: &Arranged<G, T1>,
370    name: &str,
371    mut logic: L,
372) -> Arranged<G, TraceAgent<T2>>
373where
374    G: Scope<Timestamp = T1::Time>,
375    T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
376    T2: for<'a> Trace<Key<'a> = T1::Key<'a>, KeyOwn = T1::KeyOwn, ValOwn: Data, Time = T1::Time>
377        + 'static,
378    Bu: Builder<
379        Time = T2::Time,
380        Output = T2::Batch,
381        Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
382    >,
383    L: FnMut(
384            T1::Key<'_>,
385            &[(T1::Val<'_>, T1::Diff)],
386            &mut Vec<(T2::ValOwn, T2::Diff)>,
387            &mut Vec<(T2::ValOwn, T2::Diff)>,
388        ) + 'static,
389{
390    let mut result_trace = None;
391
392    // fabricate a data-parallel operator using the `unary_notify` pattern.
393    let stream = {
394        let result_trace = &mut result_trace;
395        trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
396
397            // Acquire a logger for arrange events.
398            let logger = trace.stream.scope().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
399
400            let activator = Some(trace.stream.scope().activator_for(operator_info.address.clone()));
401            let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
402            // If there is default exert logic set, install it.
403            if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
404                empty.set_exert_logic(exert_logic);
405            }
406
407
408            let mut source_trace = trace.trace.clone();
409
410            let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
411
412            // let mut output_trace = TraceRc::make_from(agent).0;
413            *result_trace = Some(output_reader.clone());
414
415            // let mut thinker1 = history_replay_prior::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
416            // let mut thinker = history_replay::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
417            let mut new_interesting_times = Vec::<G::Timestamp>::new();
418
419            // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
420            // as well as capabilities for these times (or their lower envelope, at least).
421            let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new();
422            let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
423
424            // buffers and logic for computing per-key interesting times "efficiently".
425            let mut interesting_times = Vec::<G::Timestamp>::new();
426
427            // Upper and lower frontiers for the pending input and output batches to process.
428            let mut upper_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
429            let mut lower_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
430
431            // Output batches may need to be built piecemeal, and these temp storage help there.
432            let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
433            let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
434
435            let id = trace.stream.scope().index();
436
437            move |(input, _frontier), output| {
438
439                // The `reduce` operator receives fully formed batches, which each serve as an indication
440                // that the frontier has advanced to the upper bound of their description.
441                //
442                // Although we could act on each individually, several may have been sent, and it makes
443                // sense to accumulate them first to coordinate their re-evaluation. We will need to pay
444                // attention to which times need to be collected under which capability, so that we can
445                // assemble output batches correctly. We will maintain several builders concurrently, and
446                // place output updates into the appropriate builder.
447                //
448                // It turns out we must use notificators, as we cannot await empty batches from arrange to
449                // indicate progress, as the arrange may not hold the capability to send such. Instead, we
450                // must watch for progress here (and the upper bound of received batches) to tell us how
451                // far we can process work.
452                //
453                // We really want to retire all batches we receive, so we want a frontier which reflects
454                // both information from batches as well as progress information. I think this means that
455                // we keep times that are greater than or equal to a time in the other frontier, deduplicated.
456
457                let mut batch_cursors = Vec::new();
458                let mut batch_storage = Vec::new();
459
460                // Downgrade previous upper limit to be current lower limit.
461                lower_limit.clear();
462                lower_limit.extend(upper_limit.borrow().iter().cloned());
463
464                // Drain the input stream of batches, validating the contiguity of the batch descriptions and
465                // capturing a cursor for each of the batches as well as ensuring we hold a capability for the
466                // times in the batch.
467                input.for_each(|capability, batches| {
468
469                    for batch in batches.drain(..) {
470                        upper_limit.clone_from(batch.upper());
471                        batch_cursors.push(batch.cursor());
472                        batch_storage.push(batch);
473                    }
474
475                    // Ensure that `capabilities` covers the capability of the batch.
476                    capabilities.retain(|cap| !capability.time().less_than(cap.time()));
477                    if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) {
478                        capabilities.push(capability.retain());
479                    }
480                });
481
482                // Pull in any subsequent empty batches we believe to exist.
483                source_trace.advance_upper(&mut upper_limit);
484
485                // Only if our upper limit has advanced should we do work.
486                if upper_limit != lower_limit {
487
488                    // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send
489                    // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches
490                    // to indicate forward progress, and must hope that downstream operators look at progress frontiers
491                    // as well as batch descriptions.
492                    //
493                    // We can (and should) advance source and output traces if `upper_limit` indicates this is possible.
494                    if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
495
496                        // `interesting` contains "warnings" about keys and times that may need to be re-considered.
497                        // We first extract those times from this list that lie in the interval we will process.
498                        sort_dedup(&mut interesting);
499                        // `exposed` contains interesting (key, time)s now below `upper_limit`
500                        let mut exposed_keys = T1::KeyContainer::with_capacity(0);
501                        let mut exposed_time = T1::TimeContainer::with_capacity(0);
502                        // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
503                        interesting.retain(|(key, time)| {
504                            if upper_limit.less_equal(time) { true } else {
505                                exposed_keys.push_own(key);
506                                exposed_time.push_own(time);
507                                false
508                            }
509                        });
510
511                        // Prepare an output buffer and builder for each capability.
512                        //
513                        // We buffer and build separately, as outputs are produced grouped by time, whereas the
514                        // builder wants to see outputs grouped by value. While the per-key computation could
515                        // do the re-sorting itself, buffering per-key outputs lets us double check the results
516                        // against other implementations for accuracy.
517                        //
518                        // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
519                        //       this as long as it requires that there is only one capability for each message.
520                        let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new();
521                        let mut builders = Vec::new();
522                        for cap in capabilities.iter() {
523                            buffers.push((cap.time().clone(), Vec::new()));
524                            builders.push(Bu::new());
525                        }
526
527                        let mut buffer = Bu::Input::default();
528
529                        // cursors for navigating input and output traces.
530                        let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
531                        let source_storage = &source_storage;
532                        let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
533                        let output_storage = &output_storage;
534                        let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
535                        let batch_storage = &batch_storage;
536
537                        let mut thinker = history_replay::HistoryReplayer::new();
538
539                        // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`.
540                        //
541                        // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length
542                        // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
543                        // There could perhaps be a less provocative variable name.
544                        let mut exposed_position = 0;
545                        while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {
546
547                            // Determine the next key we will work on; could be synthetic, could be from a batch.
548                            let key1 = exposed_keys.get(exposed_position);
549                            let key2 = batch_cursor.get_key(batch_storage);
550                            let key = match (key1, key2) {
551                                (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
552                                (Some(key1), None)       => key1,
553                                (None, Some(key2))       => key2,
554                                (None, None)             => unreachable!(),
555                            };
556
557                            // `interesting_times` contains those times between `lower_issued` and `upper_limit`
558                            // that we need to re-consider. We now populate it, but perhaps this should be left
559                            // to the per-key computation, which may be able to avoid examining the times of some
560                            // values (for example, in the case of min/max/topk).
561                            interesting_times.clear();
562
563                            // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
564                            while exposed_keys.get(exposed_position) == Some(key) {
565                                interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
566                                exposed_position += 1;
567                            }
568
569                            // tidy up times, removing redundancy.
570                            sort_dedup(&mut interesting_times);
571
572                            // do the per-key computation.
573                            let _counters = thinker.compute(
574                                key,
575                                (&mut source_cursor, source_storage),
576                                (&mut output_cursor, output_storage),
577                                (&mut batch_cursor, batch_storage),
578                                &mut interesting_times,
579                                &mut logic,
580                                &upper_limit,
581                                &mut buffers[..],
582                                &mut new_interesting_times,
583                            );
584
585                            if batch_cursor.get_key(batch_storage) == Some(key) {
586                                batch_cursor.step_key(batch_storage);
587                            }
588
589                            // Record future warnings about interesting times (and assert they should be "future").
590                            for time in new_interesting_times.drain(..) {
591                                debug_assert!(upper_limit.less_equal(&time));
592                                interesting.push((T1::owned_key(key), time));
593                            }
594
595                            // Sort each buffer by value and move into the corresponding builder.
596                            // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
597                            //       (ii) that the buffers are time-ordered, and (iii) that the builders accept
598                            //       arbitrarily ordered times.
599                            for index in 0 .. buffers.len() {
600                                buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
601                                for (val, time, diff) in buffers[index].1.drain(..) {
602                                    buffer.push_into(((T1::owned_key(key), val), time, diff));
603                                    builders[index].push(&mut buffer);
604                                    buffer.clear();
605                                }
606                            }
607                        }
608
609                        // We start sealing output batches from the lower limit (previous upper limit).
610                        // In principle, we could update `lower_limit` itself, and it should arrive at
611                        // `upper_limit` by the end of the process.
612                        output_lower.clear();
613                        output_lower.extend(lower_limit.borrow().iter().cloned());
614
615                        // build and ship each batch (because only one capability per message).
616                        for (index, builder) in builders.drain(..).enumerate() {
617
618                            // Form the upper limit of the next batch, which includes all times greater
619                            // than the input batch, or the capabilities from i + 1 onward.
620                            output_upper.clear();
621                            output_upper.extend(upper_limit.borrow().iter().cloned());
622                            for capability in &capabilities[index + 1 ..] {
623                                output_upper.insert(capability.time().clone());
624                            }
625
626                            if output_upper.borrow() != output_lower.borrow() {
627
628                                let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
629                                let batch = builder.done(description);
630
631                                // ship batch to the output, and commit to the output trace.
632                                output.session(&capabilities[index]).give(batch.clone());
633                                output_writer.insert(batch, Some(capabilities[index].time().clone()));
634
635                                output_lower.clear();
636                                output_lower.extend(output_upper.borrow().iter().cloned());
637                            }
638                        }
639
640                        // This should be true, as the final iteration introduces no capabilities, and
641                        // uses exactly `upper_limit` to determine the upper bound. Good to check though.
642                        assert!(output_upper.borrow() == upper_limit.borrow());
643
644                        // Determine the frontier of our interesting times.
645                        let mut frontier = Antichain::<G::Timestamp>::new();
646                        for (_, time) in &interesting {
647                            frontier.insert_ref(time);
648                        }
649
650                        // Update `capabilities` to reflect interesting pairs described by `frontier`.
651                        let mut new_capabilities = Vec::new();
652                        for time in frontier.borrow().iter() {
653                            if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) {
654                                new_capabilities.push(cap.delayed(time));
655                            }
656                            else {
657                                println!("{}:\tfailed to find capability less than new frontier time:", id);
658                                println!("{}:\t  time: {:?}", id, time);
659                                println!("{}:\t  caps: {:?}", id, capabilities);
660                                println!("{}:\t  uppr: {:?}", id, upper_limit);
661                            }
662                        }
663                        capabilities = new_capabilities;
664
665                        // ensure that observed progress is reflected in the output.
666                        output_writer.seal(upper_limit.clone());
667                    }
668                    else {
669                        output_writer.seal(upper_limit.clone());
670                    }
671
672                    // We only anticipate future times in advance of `upper_limit`.
673                    source_trace.set_logical_compaction(upper_limit.borrow());
674                    output_reader.set_logical_compaction(upper_limit.borrow());
675
676                    // We will only slice the data between future batches.
677                    source_trace.set_physical_compaction(upper_limit.borrow());
678                    output_reader.set_physical_compaction(upper_limit.borrow());
679                }
680
681                // Exert trace maintenance if we have been so requested.
682                output_writer.exert();
683            }
684        }
685    )
686    };
687
688    Arranged {
689        stream,
690        trace: result_trace.unwrap(),
691    }
692}
693
694#[inline(never)]
695fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
696    list.dedup();
697    list.sort();
698    list.dedup();
699}
700
701trait PerKeyCompute<'a, C1, C2, C3, V>
702where
703    C1: Cursor,
704    C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
705    C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
706    V: Clone + Ord,
707{
708    fn new() -> Self;
709    fn compute<L>(
710        &mut self,
711        key: C1::Key<'a>,
712        source_cursor: (&mut C1, &'a C1::Storage),
713        output_cursor: (&mut C2, &'a C2::Storage),
714        batch_cursor: (&mut C3, &'a C3::Storage),
715        times: &mut Vec<C1::Time>,
716        logic: &mut L,
717        upper_limit: &Antichain<C1::Time>,
718        outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
719        new_interesting: &mut Vec<C1::Time>,
720    ) -> (usize, usize)
721    where
722        L: FnMut(
723            C1::Key<'a>,
724            &[(C1::Val<'a>, C1::Diff)],
725            &mut Vec<(V, C2::Diff)>,
726            &mut Vec<(V, C2::Diff)>,
727        );
728}
729
730/// Implementation based on replaying historical and new updates together.
731mod history_replay {
732
733    use timely::progress::Antichain;
734    use timely::PartialOrder;
735
736    use crate::lattice::Lattice;
737    use crate::operators::ValueHistory;
738    use crate::trace::Cursor;
739
740    use super::{sort_dedup, PerKeyCompute};
741
742    /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
743    /// time order, maintaining consolidated representations of updates with respect to future interesting times.
744    pub struct HistoryReplayer<'a, C1, C2, C3, V>
745    where
746        C1: Cursor,
747        C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
748        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
749        V: Clone + Ord,
750    {
751        input_history: ValueHistory<'a, C1>,
752        output_history: ValueHistory<'a, C2>,
753        batch_history: ValueHistory<'a, C3>,
754        input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
755        output_buffer: Vec<(V, C2::Diff)>,
756        update_buffer: Vec<(V, C2::Diff)>,
757        output_produced: Vec<((V, C2::Time), C2::Diff)>,
758        synth_times: Vec<C1::Time>,
759        meets: Vec<C1::Time>,
760        times_current: Vec<C1::Time>,
761        temporary: Vec<C1::Time>,
762    }
763
764    impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V>
765    where
766        C1: Cursor,
767        C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
768        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
769        V: Clone + Ord,
770    {
771        fn new() -> Self {
772            HistoryReplayer {
773                input_history: ValueHistory::new(),
774                output_history: ValueHistory::new(),
775                batch_history: ValueHistory::new(),
776                input_buffer: Vec::new(),
777                output_buffer: Vec::new(),
778                update_buffer: Vec::new(),
779                output_produced: Vec::new(),
780                synth_times: Vec::new(),
781                meets: Vec::new(),
782                times_current: Vec::new(),
783                temporary: Vec::new(),
784            }
785        }
786        #[inline(never)]
787        fn compute<L>(
788            &mut self,
789            key: C1::Key<'a>,
790            (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
791            (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
792            (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
793            times: &mut Vec<C1::Time>,
794            logic: &mut L,
795            upper_limit: &Antichain<C1::Time>,
796            outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
797            new_interesting: &mut Vec<C1::Time>,
798        ) -> (usize, usize)
799        where
800            L: FnMut(
801                C1::Key<'a>,
802                &[(C1::Val<'a>, C1::Diff)],
803                &mut Vec<(V, C2::Diff)>,
804                &mut Vec<(V, C2::Diff)>,
805            ),
806        {
807            // The work we need to perform is at times defined principally by the contents of `batch_cursor`
808            // and `times`, respectively "new work we just received" and "old times we were warned about".
809            //
810            // Our first step is to identify these times, so that we can use them to restrict the amount of
811            // information we need to recover from `input` and `output`; as all times of interest will have
812            // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
813            // loaded times by performing the lattice `join` with this value.
814
815            // Load the batch contents.
816            let mut batch_replay =
817                self.batch_history
818                    .replay_key(batch_cursor, batch_storage, key, |time| {
819                        C3::owned_time(time)
820                    });
821
822            // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
823            // can be used to advance other historical times, which may consolidate their representation. As
824            // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
825            // history forward.
826
827            self.meets.clear();
828            self.meets.extend(times.iter().cloned());
829            for index in (1..self.meets.len()).rev() {
830                self.meets[index - 1] = self.meets[index - 1].meet(&self.meets[index]);
831            }
832
833            // Determine the meet of times in `batch` and `times`.
834            let mut meet = None;
835            update_meet(&mut meet, self.meets.get(0));
836            update_meet(&mut meet, batch_replay.meet());
837            // if let Some(time) = self.meets.get(0) {
838            //     meet = match meet {
839            //         None => Some(self.meets[0].clone()),
840            //         Some(x) => Some(x.meet(&self.meets[0])),
841            //     };
842            // }
843            // if let Some(time) = batch_replay.meet() {
844            //     meet = match meet {
845            //         None => Some(time.clone()),
846            //         Some(x) => Some(x.meet(&time)),
847            //     };
848            // }
849
850            // Having determined the meet, we can load the input and output histories, where we
851            // advance all times by joining them with `meet`. The resulting times are more compact
852            // and guaranteed to accumulate identically for times greater or equal to `meet`.
853
854            // Load the input and output histories.
855            let mut input_replay = if let Some(meet) = meet.as_ref() {
856                self.input_history
857                    .replay_key(source_cursor, source_storage, key, |time| {
858                        let mut time = C1::owned_time(time);
859                        time.join_assign(meet);
860                        time
861                    })
862            } else {
863                self.input_history
864                    .replay_key(source_cursor, source_storage, key, |time| {
865                        C1::owned_time(time)
866                    })
867            };
868            let mut output_replay = if let Some(meet) = meet.as_ref() {
869                self.output_history
870                    .replay_key(output_cursor, output_storage, key, |time| {
871                        let mut time = C2::owned_time(time);
872                        time.join_assign(meet);
873                        time
874                    })
875            } else {
876                self.output_history
877                    .replay_key(output_cursor, output_storage, key, |time| {
878                        C2::owned_time(time)
879                    })
880            };
881
882            self.synth_times.clear();
883            self.times_current.clear();
884            self.output_produced.clear();
885
886            // The frontier of times we may still consider.
887            // Derived from frontiers of our update histories, supplied times, and synthetic times.
888
889            let mut times_slice = &times[..];
890            let mut meets_slice = &self.meets[..];
891
892            let mut compute_counter = 0;
893            let mut output_counter = 0;
894
895            // We have candidate times from `batch` and `times`, as well as times identified by either
896            // `input` or `output`. Finally, we may have synthetic times produced as the join of times
897            // we consider in the course of evaluation. As long as any of these times exist, we need to
898            // keep examining times.
899            while let Some(next_time) = [
900                batch_replay.time(),
901                times_slice.first(),
902                input_replay.time(),
903                output_replay.time(),
904                self.synth_times.last(),
905            ]
906            .iter()
907            .cloned()
908            .flatten()
909            .min()
910            .cloned()
911            {
912                // Advance input and output history replayers. This marks applicable updates as active.
913                input_replay.step_while_time_is(&next_time);
914                output_replay.step_while_time_is(&next_time);
915
916                // One of our goals is to determine if `next_time` is "interesting", meaning whether we
917                // have any evidence that we should re-evaluate the user logic at this time. For a time
918                // to be "interesting" it would need to be the join of times that include either a time
919                // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
920
921                // Advance batch history, and capture whether an update exists at `next_time`.
922                let mut interesting = batch_replay.step_while_time_is(&next_time);
923                if interesting {
924                    if let Some(meet) = meet.as_ref() {
925                        batch_replay.advance_buffer_by(meet);
926                    }
927                }
928
929                // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
930                while self.synth_times.last() == Some(&next_time) {
931                    // We don't know enough about `next_time` to avoid putting it in to `times_current`.
932                    // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
933                    self.times_current.push(
934                        self.synth_times
935                            .pop()
936                            .expect("failed to pop from synth_times"),
937                    ); // <-- TODO: this could be a min-heap.
938                    interesting = true;
939                }
940                while times_slice.first() == Some(&next_time) {
941                    // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
942                    self.times_current.push(times_slice[0].clone());
943                    times_slice = &times_slice[1..];
944                    meets_slice = &meets_slice[1..];
945                    interesting = true;
946                }
947
948                // Times could also be interesting if an interesting time is less than them, as they would join
949                // and become the time itself. They may not equal the current time because whatever frontier we
950                // are tracking may not have advanced far enough.
951                // TODO: `batch_history` may or may not be super compact at this point, and so this check might
952                //       yield false positives if not sufficiently compact. Maybe we should into this and see.
953                interesting = interesting
954                    || batch_replay
955                        .buffer()
956                        .iter()
957                        .any(|&((_, ref t), _)| t.less_equal(&next_time));
958                interesting =
959                    interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
960
961                // We should only process times that are not in advance of `upper_limit`.
962                //
963                // We have no particular guarantee that known times will not be in advance of `upper_limit`.
964                // We may have the guarantee that synthetic times will not be, as we test against the limit
965                // before we add the time to `synth_times`.
966                if !upper_limit.less_equal(&next_time) {
967                    // We should re-evaluate the computation if this is an interesting time.
968                    // If the time is uninteresting (and our logic is sound) it is not possible for there to be
969                    // output produced. This sounds like a good test to have for debug builds!
970                    if interesting {
971                        compute_counter += 1;
972
973                        // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
974                        debug_assert!(self.input_buffer.is_empty());
975                        meet.as_ref()
976                            .map(|meet| input_replay.advance_buffer_by(meet));
977                        for &((value, ref time), ref diff) in input_replay.buffer().iter() {
978                            if time.less_equal(&next_time) {
979                                self.input_buffer.push((value, diff.clone()));
980                            } else {
981                                self.temporary.push(next_time.join(time));
982                            }
983                        }
984                        for &((value, ref time), ref diff) in batch_replay.buffer().iter() {
985                            if time.less_equal(&next_time) {
986                                self.input_buffer.push((value, diff.clone()));
987                            } else {
988                                self.temporary.push(next_time.join(time));
989                            }
990                        }
991                        crate::consolidation::consolidate(&mut self.input_buffer);
992
993                        meet.as_ref()
994                            .map(|meet| output_replay.advance_buffer_by(meet));
995                        for &((value, ref time), ref diff) in output_replay.buffer().iter() {
996                            if time.less_equal(&next_time) {
997                                self.output_buffer
998                                    .push((C2::owned_val(value), diff.clone()));
999                            } else {
1000                                self.temporary.push(next_time.join(time));
1001                            }
1002                        }
1003                        for &((ref value, ref time), ref diff) in self.output_produced.iter() {
1004                            if time.less_equal(&next_time) {
1005                                self.output_buffer.push(((*value).to_owned(), diff.clone()));
1006                            } else {
1007                                self.temporary.push(next_time.join(time));
1008                            }
1009                        }
1010                        crate::consolidation::consolidate(&mut self.output_buffer);
1011
1012                        // Apply user logic if non-empty input and see what happens!
1013                        if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
1014                            logic(
1015                                key,
1016                                &self.input_buffer[..],
1017                                &mut self.output_buffer,
1018                                &mut self.update_buffer,
1019                            );
1020                            self.input_buffer.clear();
1021                            self.output_buffer.clear();
1022                        }
1023
1024                        // output_replay.advance_buffer_by(&meet);
1025                        // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
1026                        //     if time.less_equal(&next_time) {
1027                        //         self.output_buffer.push(((*value).clone(), -diff));
1028                        //     }
1029                        //     else {
1030                        //         self.temporary.push(next_time.join(time));
1031                        //     }
1032                        // }
1033                        // for &((ref value, ref time), diff) in self.output_produced.iter() {
1034                        //     if time.less_equal(&next_time) {
1035                        //         self.output_buffer.push(((*value).clone(), -diff));
1036                        //     }
1037                        //     else {
1038                        //         self.temporary.push(next_time.join(&time));
1039                        //     }
1040                        // }
1041
1042                        // Having subtracted output updates from user output, consolidate the results to determine
1043                        // if there is anything worth reporting. Note: this also orders the results by value, so
1044                        // that could make the above merging plan even easier.
1045                        crate::consolidation::consolidate(&mut self.update_buffer);
1046
1047                        // Stash produced updates into both capability-indexed buffers and `output_produced`.
1048                        // The two locations are important, in that we will compact `output_produced` as we move
1049                        // through times, but we cannot compact the output buffers because we need their actual
1050                        // times.
1051                        if !self.update_buffer.is_empty() {
1052                            output_counter += 1;
1053
1054                            // We *should* be able to find a capability for `next_time`. Any thing else would
1055                            // indicate a logical error somewhere along the way; either we release a capability
1056                            // we should have kept, or we have computed the output incorrectly (or both!)
1057                            let idx = outputs
1058                                .iter()
1059                                .rev()
1060                                .position(|(time, _)| time.less_equal(&next_time));
1061                            let idx = outputs.len() - idx.expect("failed to find index") - 1;
1062                            for (val, diff) in self.update_buffer.drain(..) {
1063                                self.output_produced
1064                                    .push(((val.clone(), next_time.clone()), diff.clone()));
1065                                outputs[idx].1.push((val, next_time.clone(), diff));
1066                            }
1067
1068                            // Advance times in `self.output_produced` and consolidate the representation.
1069                            // NOTE: We only do this when we add records; it could be that there are situations
1070                            //       where we want to consolidate even without changes (because an initially
1071                            //       large collection can now be collapsed).
1072                            if let Some(meet) = meet.as_ref() {
1073                                for entry in &mut self.output_produced {
1074                                    (entry.0).1 = (entry.0).1.join(meet);
1075                                }
1076                            }
1077                            crate::consolidation::consolidate(&mut self.output_produced);
1078                        }
1079                    }
1080
1081                    // Determine synthetic interesting times.
1082                    //
1083                    // Synthetic interesting times are produced differently for interesting and uninteresting
1084                    // times. An uninteresting time must join with an interesting time to become interesting,
1085                    // which means joins with `self.batch_history` and  `self.times_current`. I think we can
1086                    // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
1087                    // joined against everything.
1088
1089                    // Any time, even uninteresting times, must be joined with the current accumulation of
1090                    // batch times as well as the current accumulation of `times_current`.
1091                    for &((_, ref time), _) in batch_replay.buffer().iter() {
1092                        if !time.less_equal(&next_time) {
1093                            self.temporary.push(time.join(&next_time));
1094                        }
1095                    }
1096                    for time in self.times_current.iter() {
1097                        if !time.less_equal(&next_time) {
1098                            self.temporary.push(time.join(&next_time));
1099                        }
1100                    }
1101
1102                    sort_dedup(&mut self.temporary);
1103
1104                    // Introduce synthetic times, and re-organize if we add any.
1105                    let synth_len = self.synth_times.len();
1106                    for time in self.temporary.drain(..) {
1107                        // We can either service `join` now, or must delay for the future.
1108                        if upper_limit.less_equal(&time) {
1109                            debug_assert!(outputs.iter().any(|(t, _)| t.less_equal(&time)));
1110                            new_interesting.push(time);
1111                        } else {
1112                            self.synth_times.push(time);
1113                        }
1114                    }
1115                    if self.synth_times.len() > synth_len {
1116                        self.synth_times.sort_by(|x, y| y.cmp(x));
1117                        self.synth_times.dedup();
1118                    }
1119                } else if interesting {
1120                    // We cannot process `next_time` now, and must delay it.
1121                    //
1122                    // I think we are probably only here because of an uninteresting time declared interesting,
1123                    // as initial interesting times are filtered to be in interval, and synthetic times are also
1124                    // filtered before introducing them to `self.synth_times`.
1125                    new_interesting.push(next_time.clone());
1126                    debug_assert!(outputs.iter().any(|(t, _)| t.less_equal(&next_time)))
1127                }
1128
1129                // Update `meet` to track the meet of each source of times.
1130                meet = None; //T::maximum();
1131                update_meet(&mut meet, batch_replay.meet());
1132                update_meet(&mut meet, input_replay.meet());
1133                update_meet(&mut meet, output_replay.meet());
1134                for time in self.synth_times.iter() {
1135                    update_meet(&mut meet, Some(time));
1136                }
1137                // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); }
1138                // if let Some(time) = input_replay.meet() { meet = meet.meet(time); }
1139                // if let Some(time) = output_replay.meet() { meet = meet.meet(time); }
1140                // for time in self.synth_times.iter() { meet = meet.meet(time); }
1141                update_meet(&mut meet, meets_slice.first());
1142                // if let Some(time) = meets_slice.first() { meet = meet.meet(time); }
1143
1144                // Update `times_current` by the frontier.
1145                if let Some(meet) = meet.as_ref() {
1146                    for time in self.times_current.iter_mut() {
1147                        *time = time.join(meet);
1148                    }
1149                }
1150
1151                sort_dedup(&mut self.times_current);
1152            }
1153
1154            // Normalize the representation of `new_interesting`, deduplicating and ordering.
1155            sort_dedup(new_interesting);
1156
1157            (compute_counter, output_counter)
1158        }
1159    }
1160
1161    /// Updates an optional meet by an optional time.
1162    fn update_meet<T: Lattice + Clone>(meet: &mut Option<T>, other: Option<&T>) {
1163        if let Some(time) = other {
1164            if let Some(meet) = meet.as_mut() {
1165                *meet = meet.meet(time);
1166            }
1167            if meet.is_none() {
1168                *meet = Some(time.clone());
1169            }
1170        }
1171    }
1172}