differential_dataflow/operators/
reduce.rs

1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use crate::hashable::Hashable;
9use crate::{Data, ExchangeData, Collection};
10use crate::difference::{Semigroup, Abelian};
11
12use timely::order::PartialOrder;
13use timely::progress::frontier::Antichain;
14use timely::progress::Timestamp;
15use timely::dataflow::*;
16use timely::dataflow::operators::Operator;
17use timely::dataflow::channels::pact::Pipeline;
18use timely::dataflow::operators::Capability;
19
20use crate::operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
21use crate::lattice::Lattice;
22use crate::trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic};
23use crate::trace::cursor::CursorList;
24use crate::trace::implementations::{KeySpine, ValSpine};
25
26use crate::trace::TraceReader;
27
28/// Extension trait for the `reduce` differential dataflow method.
29pub trait Reduce<G: Scope, K: Data, V: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
30    /// Applies a reduction function on records grouped by key.
31    ///
32    /// Input data must be structured as `(key, val)` pairs.
33    /// The user-supplied reduction function takes as arguments
34    ///
35    /// 1. a reference to the key,
36    /// 2. a reference to the slice of values and their accumulated updates,
37    /// 3. a mutuable reference to a vector to populate with output values and accumulated updates.
38    ///
39    /// The user logic is only invoked for non-empty input collections, and it is safe to assume that the
40    /// slice of input values is non-empty. The values are presented in sorted order, as defined by their
41    /// `Ord` implementations.
42    ///
43    /// # Examples
44    ///
45    /// ```
46    /// use differential_dataflow::input::Input;
47    /// use differential_dataflow::operators::Reduce;
48    ///
49    /// ::timely::example(|scope| {
50    ///     // report the smallest value for each group
51    ///     scope.new_collection_from(1 .. 10).1
52    ///          .map(|x| (x / 3, x))
53    ///          .reduce(|_key, input, output| {
54    ///              output.push((*input[0].0, 1))
55    ///          });
56    /// });
57    /// ```
58    fn reduce<L, V2: Data, R2: Abelian>(&self, logic: L) -> Collection<G, (K, V2), R2>
59    where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
60        self.reduce_named("Reduce", logic)
61    }
62
63    /// As `reduce` with the ability to name the operator.
64    fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
65    where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static;
66}
67
68impl<G, K, V, R> Reduce<G, K, V, R> for Collection<G, (K, V), R>
69    where
70        G: Scope,
71        G::Timestamp: Lattice+Ord,
72        K: ExchangeData+Hashable,
73        V: ExchangeData,
74        R: ExchangeData+Semigroup,
75 {
76    fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
77        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
78        self.arrange_by_key_named(&format!("Arrange: {}", name))
79            .reduce_named(name, logic)
80    }
81}
82
83impl<G: Scope, K: Data, V: Data, T1, R: Semigroup> Reduce<G, K, V, R> for Arranged<G, T1>
84where
85    G::Timestamp: Lattice+Ord,
86    T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwned=K, Val<'a>=&'a V, Time=G::Timestamp, Diff=R>+Clone+'static,
87{
88    fn reduce_named<L, V2: Data, R2: Abelian>(&self, name: &str, logic: L) -> Collection<G, (K, V2), R2>
89        where L: FnMut(&K, &[(&V, R)], &mut Vec<(V2, R2)>)+'static {
90        self.reduce_abelian::<_,ValSpine<_,_,_,_>>(name, logic)
91            .as_collection(|k,v| (k.clone(), v.clone()))
92    }
93}
94
95/// Extension trait for the `threshold` and `distinct` differential dataflow methods.
96pub trait Threshold<G: Scope, K: Data, R1: Semigroup> where G::Timestamp: Lattice+Ord {
97    /// Transforms the multiplicity of records.
98    ///
99    /// The `threshold` function is obliged to map `R1::zero` to `R2::zero`, or at
100    /// least the computation may behave as if it does. Otherwise, the transformation
101    /// can be nearly arbitrary: the code does not assume any properties of `threshold`.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// use differential_dataflow::input::Input;
107    /// use differential_dataflow::operators::Threshold;
108    ///
109    /// ::timely::example(|scope| {
110    ///     // report at most one of each key.
111    ///     scope.new_collection_from(1 .. 10).1
112    ///          .map(|x| x / 3)
113    ///          .threshold(|_,c| c % 2);
114    /// });
115    /// ```
116    fn threshold<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, thresh: F) -> Collection<G, K, R2> {
117        self.threshold_named("Threshold", thresh)
118    }
119
120    /// A `threshold` with the ability to name the operator.
121    fn threshold_named<R2: Abelian, F: FnMut(&K, &R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2>;
122
123    /// Reduces the collection to one occurrence of each distinct element.
124    ///
125    /// # Examples
126    ///
127    /// ```
128    /// use differential_dataflow::input::Input;
129    /// use differential_dataflow::operators::Threshold;
130    ///
131    /// ::timely::example(|scope| {
132    ///     // report at most one of each key.
133    ///     scope.new_collection_from(1 .. 10).1
134    ///          .map(|x| x / 3)
135    ///          .distinct();
136    /// });
137    /// ```
138    fn distinct(&self) -> Collection<G, K, isize> {
139        self.distinct_core()
140    }
141
142    /// Distinct for general integer differences.
143    ///
144    /// This method allows `distinct` to produce collections whose difference
145    /// type is something other than an `isize` integer, for example perhaps an
146    /// `i32`.
147    fn distinct_core<R2: Abelian+From<i8>>(&self) -> Collection<G, K, R2> {
148        self.threshold_named("Distinct", |_,_| R2::from(1i8))
149    }
150}
151
152impl<G: Scope, K: ExchangeData+Hashable, R1: ExchangeData+Semigroup> Threshold<G, K, R1> for Collection<G, K, R1>
153where G::Timestamp: Lattice+Ord {
154    fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, thresh: F) -> Collection<G, K, R2> {
155        self.arrange_by_self_named(&format!("Arrange: {}", name))
156            .threshold_named(name, thresh)
157    }
158}
159
160impl<G: Scope, K: Data, T1, R1: Semigroup> Threshold<G, K, R1> for Arranged<G, T1>
161where
162    G::Timestamp: Lattice+Ord,
163    T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwned=K, Val<'a>=&'a (), Time=G::Timestamp, Diff=R1>+Clone+'static,
164{
165    fn threshold_named<R2: Abelian, F: FnMut(&K,&R1)->R2+'static>(&self, name: &str, mut thresh: F) -> Collection<G, K, R2> {
166        self.reduce_abelian::<_,KeySpine<_,_,_>>(name, move |k,s,t| t.push(((), thresh(k, &s[0].1))))
167            .as_collection(|k,_| k.clone())
168    }
169}
170
171/// Extension trait for the `count` differential dataflow method.
172pub trait Count<G: Scope, K: Data, R: Semigroup> where G::Timestamp: Lattice+Ord {
173    /// Counts the number of occurrences of each element.
174    ///
175    /// # Examples
176    ///
177    /// ```
178    /// use differential_dataflow::input::Input;
179    /// use differential_dataflow::operators::Count;
180    ///
181    /// ::timely::example(|scope| {
182    ///     // report the number of occurrences of each key
183    ///     scope.new_collection_from(1 .. 10).1
184    ///          .map(|x| x / 3)
185    ///          .count();
186    /// });
187    /// ```
188    fn count(&self) -> Collection<G, (K, R), isize> {
189        self.count_core()
190    }
191
192    /// Count for general integer differences.
193    ///
194    /// This method allows `count` to produce collections whose difference
195    /// type is something other than an `isize` integer, for example perhaps an
196    /// `i32`.
197    fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2>;
198}
199
200impl<G: Scope, K: ExchangeData+Hashable, R: ExchangeData+Semigroup> Count<G, K, R> for Collection<G, K, R>
201where
202    G::Timestamp: Lattice+Ord,
203{
204    fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2> {
205        self.arrange_by_self_named("Arrange: Count")
206            .count_core()
207    }
208}
209
210impl<G: Scope, K: Data, T1, R: Semigroup> Count<G, K, R> for Arranged<G, T1>
211where
212    G::Timestamp: Lattice+Ord,
213    T1: for<'a> TraceReader<Key<'a>=&'a K, KeyOwned=K, Val<'a>=&'a (), Time=G::Timestamp, Diff=R>+Clone+'static,
214{
215    fn count_core<R2: Abelian + From<i8>>(&self) -> Collection<G, (K, R), R2> {
216        self.reduce_abelian::<_,ValSpine<_,_,_,_>>("Count", |_k,s,t| t.push((s[0].1.clone(), R2::from(1i8))))
217            .as_collection(|k,c| (k.clone(), c.clone()))
218    }
219}
220
221/// Extension trait for the `reduce_core` differential dataflow method.
222pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semigroup> where G::Timestamp: Lattice+Ord {
223    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
224    ///
225    /// This method is used by the more ergonomic `reduce`, `distinct`, and `count` methods, although
226    /// it can be very useful if one needs to manually attach and re-use existing arranged collections.
227    ///
228    /// # Examples
229    ///
230    /// ```
231    /// use differential_dataflow::input::Input;
232    /// use differential_dataflow::operators::reduce::ReduceCore;
233    /// use differential_dataflow::trace::Trace;
234    /// use differential_dataflow::trace::implementations::ValSpine;
235    ///
236    /// ::timely::example(|scope| {
237    ///
238    ///     let trace =
239    ///     scope.new_collection_from(1 .. 10u32).1
240    ///          .map(|x| (x, x))
241    ///          .reduce_abelian::<_,ValSpine<_,_,_,_>>(
242    ///             "Example",
243    ///              move |_key, src, dst| dst.push((*src[0].0, 1))
244    ///          )
245    ///          .trace;
246    /// });
247    /// ```
248    fn reduce_abelian<L, T2>(&self, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
249        where
250            T2: for<'a> Trace<Key<'a>= &'a K, Time=G::Timestamp>+'static,
251            T2::ValOwned: Data,
252            T2::Diff: Abelian,
253            T2::Batch: Batch,
254            T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
255            L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
256        {
257            self.reduce_core::<_,T2>(name, move |key, input, output, change| {
258                if !input.is_empty() {
259                    logic(key, input, change);
260                }
261                change.extend(output.drain(..).map(|(x,d)| (x, d.negate())));
262                crate::consolidation::consolidate(change);
263            })
264        }
265
266    /// Solves for output updates when presented with inputs and would-be outputs.
267    ///
268    /// Unlike `reduce_arranged`, this method may be called with an empty `input`,
269    /// and it may not be safe to index into the first element.
270    /// At least one of the two collections will be non-empty.
271    fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
272        where
273            T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
274            T2::ValOwned: Data,
275            T2::Diff: Semigroup,
276            T2::Batch: Batch,
277            T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
278            L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
279            ;
280}
281
282impl<G, K, V, R> ReduceCore<G, K, V, R> for Collection<G, (K, V), R>
283where
284    G: Scope,
285    G::Timestamp: Lattice+Ord,
286    K: ExchangeData+Hashable,
287    K: ToOwned<Owned = K>,
288    V: ExchangeData,
289    R: ExchangeData+Semigroup,
290{
291    fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
292        where
293            T2::ValOwned: Data,
294            T2::Diff: Semigroup,
295            T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
296            T2::Batch: Batch,
297            T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
298            L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
299    {
300        self.arrange_by_key_named(&format!("Arrange: {}", name))
301            .reduce_core(name, logic)
302    }
303}
304
305/// A key-wise reduction of values in an input trace.
306///
307/// This method exists to provide reduce functionality without opinions about qualifying trace types.
308pub fn reduce_trace<G, T1, T2, L>(trace: &Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
309where
310    G: Scope,
311    G::Timestamp: Lattice+Ord,
312    T1: TraceReader<Time=G::Timestamp> + Clone + 'static,
313    T1::Diff: Semigroup,
314    T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=G::Timestamp> + 'static,
315    T2::ValOwned: Data,
316    T2::Diff: Semigroup,
317    T2::Batch: Batch,
318    T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
319    L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
320{
321    let mut result_trace = None;
322
323    // fabricate a data-parallel operator using the `unary_notify` pattern.
324    let stream = {
325
326        let result_trace = &mut result_trace;
327        trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
328
329            let logger = {
330                let scope = trace.stream.scope();
331                let register = scope.log_register();
332                register.get::<crate::logging::DifferentialEvent>("differential/arrange")
333            };
334
335            let activator = Some(trace.stream.scope().activator_for(&operator_info.address[..]));
336            let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
337            // If there is default exert logic set, install it.
338            if let Some(exert_logic) = trace.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
339                empty.set_exert_logic(exert_logic);
340            }
341
342
343            let mut source_trace = trace.trace.clone();
344
345            let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
346
347            // let mut output_trace = TraceRc::make_from(agent).0;
348            *result_trace = Some(output_reader.clone());
349
350            // let mut thinker1 = history_replay_prior::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
351            // let mut thinker = history_replay::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
352            let mut new_interesting_times = Vec::<G::Timestamp>::new();
353
354            // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
355            // as well as capabilities for these times (or their lower envelope, at least).
356            let mut interesting = Vec::<(T1::KeyOwned, G::Timestamp)>::new();
357            let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
358
359            // buffers and logic for computing per-key interesting times "efficiently".
360            let mut interesting_times = Vec::<G::Timestamp>::new();
361
362            // Upper and lower frontiers for the pending input and output batches to process.
363            let mut upper_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
364            let mut lower_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
365
366            // Output batches may need to be built piecemeal, and these temp storage help there.
367            let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
368            let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
369
370            let mut input_buffer = Vec::new();
371
372            let id = trace.stream.scope().index();
373
374            move |input, output| {
375
376                // The `reduce` operator receives fully formed batches, which each serve as an indication
377                // that the frontier has advanced to the upper bound of their description.
378                //
379                // Although we could act on each individually, several may have been sent, and it makes
380                // sense to accumulate them first to coordinate their re-evaluation. We will need to pay
381                // attention to which times need to be collected under which capability, so that we can
382                // assemble output batches correctly. We will maintain several builders concurrently, and
383                // place output updates into the appropriate builder.
384                //
385                // It turns out we must use notificators, as we cannot await empty batches from arrange to
386                // indicate progress, as the arrange may not hold the capability to send such. Instead, we
387                // must watch for progress here (and the upper bound of received batches) to tell us how
388                // far we can process work.
389                //
390                // We really want to retire all batches we receive, so we want a frontier which reflects
391                // both information from batches as well as progress information. I think this means that
392                // we keep times that are greater than or equal to a time in the other frontier, deduplicated.
393
394                let mut batch_cursors = Vec::new();
395                let mut batch_storage = Vec::new();
396
397                // Downgrate previous upper limit to be current lower limit.
398                lower_limit.clear();
399                lower_limit.extend(upper_limit.borrow().iter().cloned());
400
401                // Drain the input stream of batches, validating the contiguity of the batch descriptions and
402                // capturing a cursor for each of the batches as well as ensuring we hold a capability for the
403                // times in the batch.
404                input.for_each(|capability, batches| {
405
406                    batches.swap(&mut input_buffer);
407                    for batch in input_buffer.drain(..) {
408                        upper_limit.clone_from(batch.upper());
409                        batch_cursors.push(batch.cursor());
410                        batch_storage.push(batch);
411                    }
412
413                    // Ensure that `capabilities` covers the capability of the batch.
414                    capabilities.retain(|cap| !capability.time().less_than(cap.time()));
415                    if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) {
416                        capabilities.push(capability.retain());
417                    }
418                });
419
420                // Pull in any subsequent empty batches we believe to exist.
421                source_trace.advance_upper(&mut upper_limit);
422
423                // Only if our upper limit has advanced should we do work.
424                if upper_limit != lower_limit {
425
426                    // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send
427                    // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches
428                    // to indicate forward progress, and must hope that downstream operators look at progress frontiers
429                    // as well as batch descriptions.
430                    //
431                    // We can (and should) advance source and output traces if `upper_limit` indicates this is possible.
432                    if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
433
434                        // `interesting` contains "warnings" about keys and times that may need to be re-considered.
435                        // We first extract those times from this list that lie in the interval we will process.
436                        sort_dedup(&mut interesting);
437                        // `exposed` contains interesting (key, time)s now below `upper_limit`
438                        let exposed = {
439                            let (exposed, new_interesting) = interesting.drain(..).partition(|(_, time)| !upper_limit.less_equal(time));
440                            interesting = new_interesting;
441                            exposed
442                        };
443
444                        // Prepare an output buffer and builder for each capability.
445                        //
446                        // We buffer and build separately, as outputs are produced grouped by time, whereas the
447                        // builder wants to see outputs grouped by value. While the per-key computation could
448                        // do the re-sorting itself, buffering per-key outputs lets us double check the results
449                        // against other implementations for accuracy.
450                        //
451                        // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
452                        //       this as long as it requires that there is only one capability for each message.
453                        let mut buffers = Vec::<(G::Timestamp, Vec<(<T2::Cursor as Cursor>::ValOwned, G::Timestamp, T2::Diff)>)>::new();
454                        let mut builders = Vec::new();
455                        for cap in capabilities.iter() {
456                            buffers.push((cap.time().clone(), Vec::new()));
457                            builders.push(T2::Builder::new());
458                        }
459
460                        // cursors for navigating input and output traces.
461                        let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
462                        let source_storage = &source_storage;
463                        let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
464                        let output_storage = &output_storage;
465                        let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
466                        let batch_storage = &batch_storage;
467
468                        let mut thinker = history_replay::HistoryReplayer::new();
469
470                        // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`.
471                        //
472                        // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length
473                        // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
474                        // There could perhaps be a less provocative variable name.
475                        let mut exposed_position = 0;
476                        while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() {
477
478                            use std::borrow::Borrow;
479                            use crate::trace::cursor::MyTrait;
480                            
481                            // Determine the next key we will work on; could be synthetic, could be from a batch.
482                            let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0));
483                            let key2 = batch_cursor.get_key(batch_storage);
484                            let key = match (key1, key2) {
485                                (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
486                                (Some(key1), None)       => key1,
487                                (None, Some(key2))       => key2,
488                                (None, None)             => unreachable!(),
489                            };
490
491                            // `interesting_times` contains those times between `lower_issued` and `upper_limit`
492                            // that we need to re-consider. We now populate it, but perhaps this should be left
493                            // to the per-key computation, which may be able to avoid examining the times of some
494                            // values (for example, in the case of min/max/topk).
495                            interesting_times.clear();
496
497                            // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
498                            while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) {
499                                interesting_times.push(exposed[exposed_position].1.clone());
500                                exposed_position += 1;
501                            }
502
503                            // tidy up times, removing redundancy.
504                            sort_dedup(&mut interesting_times);
505
506                            // do the per-key computation.
507                            let _counters = thinker.compute(
508                                key,
509                                (&mut source_cursor, source_storage),
510                                (&mut output_cursor, output_storage),
511                                (&mut batch_cursor, batch_storage),
512                                &mut interesting_times,
513                                &mut logic,
514                                &upper_limit,
515                                &mut buffers[..],
516                                &mut new_interesting_times,
517                            );
518
519                            if batch_cursor.get_key(batch_storage) == Some(key) {
520                                batch_cursor.step_key(batch_storage);
521                            }
522
523                            // Record future warnings about interesting times (and assert they should be "future").
524                            for time in new_interesting_times.drain(..) {
525                                debug_assert!(upper_limit.less_equal(&time));
526                                interesting.push((key.into_owned(), time));
527                            }
528
529                            // Sort each buffer by value and move into the corresponding builder.
530                            // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
531                            //       (ii) that the buffers are time-ordered, and (iii) that the builders accept
532                            //       arbitrarily ordered times.
533                            for index in 0 .. buffers.len() {
534                                buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
535                                for (val, time, diff) in buffers[index].1.drain(..) {
536                                    builders[index].push(((key.into_owned(), val), time, diff));
537                                }
538                            }
539                        }
540
541                        // We start sealing output batches from the lower limit (previous upper limit).
542                        // In principle, we could update `lower_limit` itself, and it should arrive at
543                        // `upper_limit` by the end of the process.
544                        output_lower.clear();
545                        output_lower.extend(lower_limit.borrow().iter().cloned());
546
547                        // build and ship each batch (because only one capability per message).
548                        for (index, builder) in builders.drain(..).enumerate() {
549
550                            // Form the upper limit of the next batch, which includes all times greater
551                            // than the input batch, or the capabilities from i + 1 onward.
552                            output_upper.clear();
553                            output_upper.extend(upper_limit.borrow().iter().cloned());
554                            for capability in &capabilities[index + 1 ..] {
555                                output_upper.insert(capability.time().clone());
556                            }
557
558                            if output_upper.borrow() != output_lower.borrow() {
559
560                                let batch = builder.done(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
561
562                                // ship batch to the output, and commit to the output trace.
563                                output.session(&capabilities[index]).give(batch.clone());
564                                output_writer.insert(batch, Some(capabilities[index].time().clone()));
565
566                                output_lower.clear();
567                                output_lower.extend(output_upper.borrow().iter().cloned());
568                            }
569                        }
570
571                        // This should be true, as the final iteration introduces no capabilities, and
572                        // uses exactly `upper_limit` to determine the upper bound. Good to check though.
573                        assert!(output_upper.borrow() == upper_limit.borrow());
574
575                        // Determine the frontier of our interesting times.
576                        let mut frontier = Antichain::<G::Timestamp>::new();
577                        for (_, time) in &interesting {
578                            frontier.insert(time.clone());
579                        }
580
581                        // Update `capabilities` to reflect interesting pairs described by `frontier`.
582                        let mut new_capabilities = Vec::new();
583                        for time in frontier.borrow().iter() {
584                            if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) {
585                                new_capabilities.push(cap.delayed(time));
586                            }
587                            else {
588                                println!("{}:\tfailed to find capability less than new frontier time:", id);
589                                println!("{}:\t  time: {:?}", id, time);
590                                println!("{}:\t  caps: {:?}", id, capabilities);
591                                println!("{}:\t  uppr: {:?}", id, upper_limit);
592                            }
593                        }
594                        capabilities = new_capabilities;
595
596                        // ensure that observed progres is reflected in the output.
597                        output_writer.seal(upper_limit.clone());
598                    }
599                    else {
600                        output_writer.seal(upper_limit.clone());
601                    }
602
603                    // We only anticipate future times in advance of `upper_limit`.
604                    source_trace.set_logical_compaction(upper_limit.borrow());
605                    output_reader.set_logical_compaction(upper_limit.borrow());
606
607                    // We will only slice the data between future batches.
608                    source_trace.set_physical_compaction(upper_limit.borrow());
609                    output_reader.set_physical_compaction(upper_limit.borrow());
610                }
611
612                // Exert trace maintenance if we have been so requested.
613                output_writer.exert();
614            }
615        }
616    )
617    };
618
619    Arranged { stream, trace: result_trace.unwrap() }
620}
621
622
623#[inline(never)]
624fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
625    list.dedup();
626    list.sort();
627    list.dedup();
628}
629
630trait PerKeyCompute<'a, C1, C2, C3>
631where
632    C1: Cursor,
633    C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
634    C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
635    C2::ValOwned: Ord + Clone,
636    C1::Time: Lattice+Ord+Clone,
637    C1::Diff: Semigroup,
638    C2::Diff: Semigroup,
639{
640    fn new() -> Self;
641    fn compute<L>(
642        &mut self,
643        key: C1::Key<'a>,
644        source_cursor: (&mut C1, &'a C1::Storage),
645        output_cursor: (&mut C2, &'a C2::Storage),
646        batch_cursor: (&mut C3, &'a C3::Storage),
647        times: &mut Vec<C1::Time>,
648        logic: &mut L,
649        upper_limit: &Antichain<C1::Time>,
650        outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)],
651        new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
652    where
653        L: FnMut(
654            C1::Key<'a>, 
655            &[(C1::Val<'a>, C1::Diff)],
656            &mut Vec<(C2::ValOwned, C2::Diff)>,
657            &mut Vec<(C2::ValOwned, C2::Diff)>,
658        );
659}
660
661
662/// Implementation based on replaying historical and new updates together.
663mod history_replay {
664
665    use crate::difference::Semigroup;
666    use crate::lattice::Lattice;
667    use crate::trace::Cursor;
668    use crate::operators::ValueHistory;
669    use timely::progress::Antichain;
670
671    use timely::PartialOrder;
672
673    use super::{PerKeyCompute, sort_dedup};
674
675    /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
676    /// time order, maintaining consolidated representations of updates with respect to future interesting times.
677    pub struct HistoryReplayer<'a, C1, C2, C3>//V1, V2, T, R1, R2>
678    where
679        C1: Cursor,
680        C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
681        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
682        C2::ValOwned: Ord + Clone,
683        C1::Time: Lattice+Ord+Clone,
684        C1::Diff: Semigroup,
685        C2::Diff: Semigroup,
686    {
687        input_history: ValueHistory<'a, C1>,
688        output_history: ValueHistory<'a, C2>,
689        batch_history: ValueHistory<'a, C3>,
690        input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
691        output_buffer: Vec<(C2::ValOwned, C2::Diff)>,
692        update_buffer: Vec<(C2::ValOwned, C2::Diff)>,
693        output_produced: Vec<((C2::ValOwned, C2::Time), C2::Diff)>,
694        synth_times: Vec<C1::Time>,
695        meets: Vec<C1::Time>,
696        times_current: Vec<C1::Time>,
697        temporary: Vec<C1::Time>,
698    }
699
700    impl<'a, C1, C2, C3> PerKeyCompute<'a, C1, C2, C3> for HistoryReplayer<'a, C1, C2, C3>
701    where
702        C1: Cursor,
703        C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
704        C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
705        C2::ValOwned: Ord + Clone,
706        C1::Time: Lattice+Ord+Clone,
707        C1::Diff: Semigroup,
708        C2::Diff: Semigroup,
709    {
710        fn new() -> Self {
711            HistoryReplayer {
712                input_history: ValueHistory::new(),
713                output_history: ValueHistory::new(),
714                batch_history: ValueHistory::new(),
715                input_buffer: Vec::new(),
716                output_buffer: Vec::new(),
717                update_buffer: Vec::new(),
718                output_produced: Vec::new(),
719                synth_times: Vec::new(),
720                meets: Vec::new(),
721                times_current: Vec::new(),
722                temporary: Vec::new(),
723            }
724        }
725        #[inline(never)]
726        fn compute<L>(
727            &mut self,
728            key: C1::Key<'a>,
729            (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
730            (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
731            (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
732            times: &mut Vec<C1::Time>,
733            logic: &mut L,
734            upper_limit: &Antichain<C1::Time>,
735            outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)],
736            new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
737        where
738            L: FnMut(
739                C1::Key<'a>, 
740                &[(C1::Val<'a>, C1::Diff)],
741                &mut Vec<(C2::ValOwned, C2::Diff)>,
742                &mut Vec<(C2::ValOwned, C2::Diff)>,
743            )
744        {
745
746            // The work we need to perform is at times defined principally by the contents of `batch_cursor`
747            // and `times`, respectively "new work we just received" and "old times we were warned about".
748            //
749            // Our first step is to identify these times, so that we can use them to restrict the amount of
750            // information we need to recover from `input` and `output`; as all times of interest will have
751            // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
752            // loaded times by performing the lattice `join` with this value.
753
754            // Load the batch contents.
755            let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| time.clone());
756
757            // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
758            // can be used to advance other historical times, which may consolidate their representation. As
759            // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
760            // history forward.
761
762            self.meets.clear();
763            self.meets.extend(times.iter().cloned());
764            for index in (1 .. self.meets.len()).rev() {
765                self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
766            }
767
768            // Determine the meet of times in `batch` and `times`.
769            let mut meet = None;
770            update_meet(&mut meet, self.meets.get(0));
771            update_meet(&mut meet, batch_replay.meet());
772            // if let Some(time) = self.meets.get(0) {
773            //     meet = match meet {
774            //         None => Some(self.meets[0].clone()),
775            //         Some(x) => Some(x.meet(&self.meets[0])),
776            //     };
777            // }
778            // if let Some(time) = batch_replay.meet() {
779            //     meet = match meet {
780            //         None => Some(time.clone()),
781            //         Some(x) => Some(x.meet(&time)),
782            //     };
783            // }
784
785            // Having determined the meet, we can load the input and output histories, where we
786            // advance all times by joining them with `meet`. The resulting times are more compact
787            // and guaranteed to accumulate identically for times greater or equal to `meet`.
788
789            // Load the input and output histories.
790            let mut input_replay = if let Some(meet) = meet.as_ref() {
791                self.input_history.replay_key(source_cursor, source_storage, key, |time| time.join(meet))
792            }
793            else {
794                self.input_history.replay_key(source_cursor, source_storage, key, |time| time.clone())
795            };
796            let mut output_replay = if let Some(meet) = meet.as_ref() {
797                self.output_history.replay_key(output_cursor, output_storage, key, |time| time.join(meet))
798            }
799            else {
800                self.output_history.replay_key(output_cursor, output_storage, key, |time| time.clone())
801            };
802
803            self.synth_times.clear();
804            self.times_current.clear();
805            self.output_produced.clear();
806
807            // The frontier of times we may still consider.
808            // Derived from frontiers of our update histories, supplied times, and synthetic times.
809
810            let mut times_slice = &times[..];
811            let mut meets_slice = &self.meets[..];
812
813            let mut compute_counter = 0;
814            let mut output_counter = 0;
815
816            // We have candidate times from `batch` and `times`, as well as times identified by either
817            // `input` or `output`. Finally, we may have synthetic times produced as the join of times
818            // we consider in the course of evaluation. As long as any of these times exist, we need to
819            // keep examining times.
820            while let Some(next_time) = [   batch_replay.time(),
821                                            times_slice.first(),
822                                            input_replay.time(),
823                                            output_replay.time(),
824                                            self.synth_times.last(),
825                                        ].iter().cloned().flatten().min().cloned() {
826
827                // Advance input and output history replayers. This marks applicable updates as active.
828                input_replay.step_while_time_is(&next_time);
829                output_replay.step_while_time_is(&next_time);
830
831                // One of our goals is to determine if `next_time` is "interesting", meaning whether we
832                // have any evidence that we should re-evaluate the user logic at this time. For a time
833                // to be "interesting" it would need to be the join of times that include either a time
834                // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
835
836                // Advance batch history, and capture whether an update exists at `next_time`.
837                let mut interesting = batch_replay.step_while_time_is(&next_time);
838                if interesting {
839                    if let Some(meet) = meet.as_ref() {
840                        batch_replay.advance_buffer_by(meet);
841                    }
842                }
843
844                // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
845                while self.synth_times.last() == Some(&next_time) {
846                    // We don't know enough about `next_time` to avoid putting it in to `times_current`.
847                    // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
848                    self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
849                    interesting = true;
850                }
851                while times_slice.first() == Some(&next_time) {
852                    // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
853                    self.times_current.push(times_slice[0].clone());
854                    times_slice = &times_slice[1..];
855                    meets_slice = &meets_slice[1..];
856                    interesting = true;
857                }
858
859                // Times could also be interesting if an interesting time is less than them, as they would join
860                // and become the time itself. They may not equal the current time because whatever frontier we
861                // are tracking may not have advanced far enough.
862                // TODO: `batch_history` may or may not be super compact at this point, and so this check might
863                //       yield false positives if not sufficiently compact. Maybe we should into this and see.
864                interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
865                interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
866
867                // We should only process times that are not in advance of `upper_limit`.
868                //
869                // We have no particular guarantee that known times will not be in advance of `upper_limit`.
870                // We may have the guarantee that synthetic times will not be, as we test against the limit
871                // before we add the time to `synth_times`.
872                if !upper_limit.less_equal(&next_time) {
873
874                    // We should re-evaluate the computation if this is an interesting time.
875                    // If the time is uninteresting (and our logic is sound) it is not possible for there to be
876                    // output produced. This sounds like a good test to have for debug builds!
877                    if interesting {
878
879                        compute_counter += 1;
880
881                        // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
882                        debug_assert!(self.input_buffer.is_empty());
883                        meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet));
884                        for &((value, ref time), ref diff) in input_replay.buffer().iter() {
885                            if time.less_equal(&next_time) {
886                                self.input_buffer.push((value, diff.clone()));
887                            }
888                            else {
889                                self.temporary.push(next_time.join(time));
890                            }
891                        }
892                        for &((value, ref time), ref diff) in batch_replay.buffer().iter() {
893                            if time.less_equal(&next_time) {
894                                self.input_buffer.push((value, diff.clone()));
895                            }
896                            else {
897                                self.temporary.push(next_time.join(time));
898                            }
899                        }
900                        crate::consolidation::consolidate(&mut self.input_buffer);
901
902                        meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet));
903                        for &((value, ref time), ref diff) in output_replay.buffer().iter() {
904                            if time.less_equal(&next_time) {
905                                use crate::trace::cursor::MyTrait;
906                                self.output_buffer.push((<_ as MyTrait>::into_owned(value), diff.clone()));
907                            }
908                            else {
909                                self.temporary.push(next_time.join(time));
910                            }
911                        }
912                        for &((ref value, ref time), ref diff) in self.output_produced.iter() {
913                            if time.less_equal(&next_time) {
914                                self.output_buffer.push(((*value).to_owned(), diff.clone()));
915                            }
916                            else {
917                                self.temporary.push(next_time.join(time));
918                            }
919                        }
920                        crate::consolidation::consolidate(&mut self.output_buffer);
921
922                        // Apply user logic if non-empty input and see what happens!
923                        if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
924                            logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
925                            self.input_buffer.clear();
926                            self.output_buffer.clear();
927                        }
928
929                        // output_replay.advance_buffer_by(&meet);
930                        // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
931                        //     if time.less_equal(&next_time) {
932                        //         self.output_buffer.push(((*value).clone(), -diff));
933                        //     }
934                        //     else {
935                        //         self.temporary.push(next_time.join(time));
936                        //     }
937                        // }
938                        // for &((ref value, ref time), diff) in self.output_produced.iter() {
939                        //     if time.less_equal(&next_time) {
940                        //         self.output_buffer.push(((*value).clone(), -diff));
941                        //     }
942                        //     else {
943                        //         self.temporary.push(next_time.join(&time));
944                        //     }
945                        // }
946
947                        // Having subtracted output updates from user output, consolidate the results to determine
948                        // if there is anything worth reporting. Note: this also orders the results by value, so
949                        // that could make the above merging plan even easier.
950                        crate::consolidation::consolidate(&mut self.update_buffer);
951
952                        // Stash produced updates into both capability-indexed buffers and `output_produced`.
953                        // The two locations are important, in that we will compact `output_produced` as we move
954                        // through times, but we cannot compact the output buffers because we need their actual
955                        // times.
956                        if !self.update_buffer.is_empty() {
957
958                            output_counter += 1;
959
960                            // We *should* be able to find a capability for `next_time`. Any thing else would
961                            // indicate a logical error somewhere along the way; either we release a capability
962                            // we should have kept, or we have computed the output incorrectly (or both!)
963                            let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
964                            let idx = outputs.len() - idx.expect("failed to find index") - 1;
965                            for (val, diff) in self.update_buffer.drain(..) {
966                                self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
967                                outputs[idx].1.push((val, next_time.clone(), diff));
968                            }
969
970                            // Advance times in `self.output_produced` and consolidate the representation.
971                            // NOTE: We only do this when we add records; it could be that there are situations
972                            //       where we want to consolidate even without changes (because an initially
973                            //       large collection can now be collapsed).
974                            if let Some(meet) = meet.as_ref() {
975                                for entry in &mut self.output_produced {
976                                    (entry.0).1 = (entry.0).1.join(meet);
977                                }
978                            }
979                            crate::consolidation::consolidate(&mut self.output_produced);
980                        }
981                    }
982
983                    // Determine synthetic interesting times.
984                    //
985                    // Synthetic interesting times are produced differently for interesting and uninteresting
986                    // times. An uninteresting time must join with an interesting time to become interesting,
987                    // which means joins with `self.batch_history` and  `self.times_current`. I think we can
988                    // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
989                    // joined against everything.
990
991                    // Any time, even uninteresting times, must be joined with the current accumulation of
992                    // batch times as well as the current accumulation of `times_current`.
993                    for &((_, ref time), _) in batch_replay.buffer().iter() {
994                        if !time.less_equal(&next_time) {
995                            self.temporary.push(time.join(&next_time));
996                        }
997                    }
998                    for time in self.times_current.iter() {
999                        if !time.less_equal(&next_time) {
1000                            self.temporary.push(time.join(&next_time));
1001                        }
1002                    }
1003
1004                    sort_dedup(&mut self.temporary);
1005
1006                    // Introduce synthetic times, and re-organize if we add any.
1007                    let synth_len = self.synth_times.len();
1008                    for time in self.temporary.drain(..) {
1009                        // We can either service `join` now, or must delay for the future.
1010                        if upper_limit.less_equal(&time) {
1011                            debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
1012                            new_interesting.push(time);
1013                        }
1014                        else {
1015                            self.synth_times.push(time);
1016                        }
1017                    }
1018                    if self.synth_times.len() > synth_len {
1019                        self.synth_times.sort_by(|x,y| y.cmp(x));
1020                        self.synth_times.dedup();
1021                    }
1022                }
1023                else if interesting {
1024                    // We cannot process `next_time` now, and must delay it.
1025                    //
1026                    // I think we are probably only here because of an uninteresting time declared interesting,
1027                    // as initial interesting times are filtered to be in interval, and synthetic times are also
1028                    // filtered before introducing them to `self.synth_times`.
1029                    new_interesting.push(next_time.clone());
1030                    debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
1031                }
1032            
1033
1034                // Update `meet` to track the meet of each source of times.
1035                meet = None;//T::maximum();
1036                update_meet(&mut meet, batch_replay.meet());
1037                update_meet(&mut meet, input_replay.meet());
1038                update_meet(&mut meet, output_replay.meet());
1039                for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
1040                // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); }
1041                // if let Some(time) = input_replay.meet() { meet = meet.meet(time); }
1042                // if let Some(time) = output_replay.meet() { meet = meet.meet(time); }
1043                // for time in self.synth_times.iter() { meet = meet.meet(time); }
1044                update_meet(&mut meet, meets_slice.first());
1045                // if let Some(time) = meets_slice.first() { meet = meet.meet(time); }
1046
1047                // Update `times_current` by the frontier.
1048                if let Some(meet) = meet.as_ref() {
1049                    for time in self.times_current.iter_mut() {
1050                        *time = time.join(meet);
1051                    }
1052                }
1053
1054                sort_dedup(&mut self.times_current);
1055            }
1056
1057            // Normalize the representation of `new_interesting`, deduplicating and ordering.
1058            sort_dedup(new_interesting);
1059
1060            (compute_counter, output_counter)
1061        }
1062    }
1063
1064    /// Updates an optional meet by an optional time.
1065    fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
1066        if let Some(time) = other {
1067            if let Some(meet) = meet.as_mut() {
1068                *meet = meet.meet(time);
1069            }
1070            if meet.is_none() {
1071                *meet = Some(time.clone());
1072            }
1073        }
1074    }
1075}