differential_dataflow/operators/reduce.rs
1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use timely::container::PushInto;
9use crate::Data;
10
11use timely::order::PartialOrder;
12use timely::progress::frontier::Antichain;
13use timely::progress::Timestamp;
14use timely::dataflow::*;
15use timely::dataflow::operators::Operator;
16use timely::dataflow::channels::pact::Pipeline;
17use timely::dataflow::operators::Capability;
18
19use crate::operators::arrange::{Arranged, TraceAgent};
20use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
21use crate::trace::cursor::CursorList;
22use crate::trace::implementations::containers::BatchContainer;
23use crate::trace::implementations::merge_batcher::container::MergerChunk;
24use crate::trace::TraceReader;
25
26/// A key-wise reduction of values in an input trace.
27///
28/// This method exists to provide reduce functionality without opinions about qualifying trace types.
29pub fn reduce_trace<G, T1, Bu, T2, L>(trace: Arranged<G, T1>, name: &str, mut logic: L) -> Arranged<G, TraceAgent<T2>>
30where
31 G: Scope<Timestamp=T1::Time>,
32 T1: TraceReader<KeyOwn: Ord> + Clone + 'static,
33 T2: for<'a> Trace<Key<'a>=T1::Key<'a>, KeyOwn=T1::KeyOwn, ValOwn: Data, Time=T1::Time> + 'static,
34 Bu: Builder<Time=T2::Time, Output = T2::Batch, Input: MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>>,
35 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn,T2::Diff)>, &mut Vec<(T2::ValOwn, T2::Diff)>)+'static,
36{
37 let mut result_trace = None;
38
39 // fabricate a data-parallel operator using the `unary_notify` pattern.
40 let stream = {
41
42 let result_trace = &mut result_trace;
43 let scope = trace.stream.scope();
44 trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
45
46 // Acquire a logger for arrange events.
47 let logger = scope.logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
48
49 let activator = Some(scope.activator_for(operator_info.address.clone()));
50 let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
51 // If there is default exert logic set, install it.
52 if let Some(exert_logic) = scope.config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
53 empty.set_exert_logic(exert_logic);
54 }
55
56
57 let mut source_trace = trace.trace.clone();
58
59 let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
60
61 // let mut output_trace = TraceRc::make_from(agent).0;
62 *result_trace = Some(output_reader.clone());
63
64 // let mut thinker1 = history_replay_prior::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
65 // let mut thinker = history_replay::HistoryReplayer::<V, V2, G::Timestamp, R, R2>::new();
66 let mut new_interesting_times = Vec::<G::Timestamp>::new();
67
68 // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
69 // as well as capabilities for these times (or their lower envelope, at least).
70 let mut interesting = Vec::<(T1::KeyOwn, G::Timestamp)>::new();
71 let mut capabilities = Vec::<Capability<G::Timestamp>>::new();
72
73 // buffers and logic for computing per-key interesting times "efficiently".
74 let mut interesting_times = Vec::<G::Timestamp>::new();
75
76 // Upper and lower frontiers for the pending input and output batches to process.
77 let mut upper_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
78 let mut lower_limit = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
79
80 // Output batches may need to be built piecemeal, and these temp storage help there.
81 let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
82 let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
83
84 let id = scope.index();
85
86 move |(input, _frontier), output| {
87
88 // The `reduce` operator receives fully formed batches, which each serve as an indication
89 // that the frontier has advanced to the upper bound of their description.
90 //
91 // Although we could act on each individually, several may have been sent, and it makes
92 // sense to accumulate them first to coordinate their re-evaluation. We will need to pay
93 // attention to which times need to be collected under which capability, so that we can
94 // assemble output batches correctly. We will maintain several builders concurrently, and
95 // place output updates into the appropriate builder.
96 //
97 // It turns out we must use notificators, as we cannot await empty batches from arrange to
98 // indicate progress, as the arrange may not hold the capability to send such. Instead, we
99 // must watch for progress here (and the upper bound of received batches) to tell us how
100 // far we can process work.
101 //
102 // We really want to retire all batches we receive, so we want a frontier which reflects
103 // both information from batches as well as progress information. I think this means that
104 // we keep times that are greater than or equal to a time in the other frontier, deduplicated.
105
106 let mut batch_cursors = Vec::new();
107 let mut batch_storage = Vec::new();
108
109 // Downgrade previous upper limit to be current lower limit.
110 lower_limit.clear();
111 lower_limit.extend(upper_limit.borrow().iter().cloned());
112
113 // Drain the input stream of batches, validating the contiguity of the batch descriptions and
114 // capturing a cursor for each of the batches as well as ensuring we hold a capability for the
115 // times in the batch.
116 input.for_each(|capability, batches| {
117
118 for batch in batches.drain(..) {
119 upper_limit.clone_from(batch.upper());
120 batch_cursors.push(batch.cursor());
121 batch_storage.push(batch);
122 }
123
124 // Ensure that `capabilities` covers the capability of the batch.
125 capabilities.retain(|cap| !capability.time().less_than(cap.time()));
126 if !capabilities.iter().any(|cap| cap.time().less_equal(capability.time())) {
127 capabilities.push(capability.retain(0));
128 }
129 });
130
131 // Pull in any subsequent empty batches we believe to exist.
132 source_trace.advance_upper(&mut upper_limit);
133
134 // Only if our upper limit has advanced should we do work.
135 if upper_limit != lower_limit {
136
137 // If we have no capabilities, then we (i) should not produce any outputs and (ii) could not send
138 // any produced outputs even if they were (incorrectly) produced. We cannot even send empty batches
139 // to indicate forward progress, and must hope that downstream operators look at progress frontiers
140 // as well as batch descriptions.
141 //
142 // We can (and should) advance source and output traces if `upper_limit` indicates this is possible.
143 if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
144
145 // `interesting` contains "warnings" about keys and times that may need to be re-considered.
146 // We first extract those times from this list that lie in the interval we will process.
147 sort_dedup(&mut interesting);
148 // `exposed` contains interesting (key, time)s now below `upper_limit`
149 let mut exposed_keys = T1::KeyContainer::with_capacity(0);
150 let mut exposed_time = T1::TimeContainer::with_capacity(0);
151 // Keep pairs greater or equal to `upper_limit`, and "expose" other pairs.
152 interesting.retain(|(key, time)| {
153 if upper_limit.less_equal(time) { true } else {
154 exposed_keys.push_own(key);
155 exposed_time.push_own(time);
156 false
157 }
158 });
159
160 // Prepare an output buffer and builder for each capability.
161 //
162 // We buffer and build separately, as outputs are produced grouped by time, whereas the
163 // builder wants to see outputs grouped by value. While the per-key computation could
164 // do the re-sorting itself, buffering per-key outputs lets us double check the results
165 // against other implementations for accuracy.
166 //
167 // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
168 // this as long as it requires that there is only one capability for each message.
169 let mut buffers = Vec::<(G::Timestamp, Vec<(T2::ValOwn, G::Timestamp, T2::Diff)>)>::new();
170 let mut builders = Vec::new();
171 for cap in capabilities.iter() {
172 buffers.push((cap.time().clone(), Vec::new()));
173 builders.push(Bu::new());
174 }
175
176 let mut buffer = Bu::Input::default();
177
178 // cursors for navigating input and output traces.
179 let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
180 let source_storage = &source_storage;
181 let (mut output_cursor, output_storage): (T2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
182 let output_storage = &output_storage;
183 let (mut batch_cursor, batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
184 let batch_storage = &batch_storage;
185
186 let mut thinker = history_replay::HistoryReplayer::new();
187
188 // We now march through the keys we must work on, drawing from `batch_cursors` and `exposed`.
189 //
190 // We only keep valid cursors (those with more data) in `batch_cursors`, and so its length
191 // indicates whether more data remain. We move through `exposed` using (index) `exposed_position`.
192 // There could perhaps be a less provocative variable name.
193 let mut exposed_position = 0;
194 while batch_cursor.key_valid(batch_storage) || exposed_position < exposed_keys.len() {
195
196 // Determine the next key we will work on; could be synthetic, could be from a batch.
197 let key1 = exposed_keys.get(exposed_position);
198 let key2 = batch_cursor.get_key(batch_storage);
199 let key = match (key1, key2) {
200 (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
201 (Some(key1), None) => key1,
202 (None, Some(key2)) => key2,
203 (None, None) => unreachable!(),
204 };
205
206 // `interesting_times` contains those times between `lower_issued` and `upper_limit`
207 // that we need to re-consider. We now populate it, but perhaps this should be left
208 // to the per-key computation, which may be able to avoid examining the times of some
209 // values (for example, in the case of min/max/topk).
210 interesting_times.clear();
211
212 // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key.
213 while exposed_keys.get(exposed_position) == Some(key) {
214 interesting_times.push(T1::owned_time(exposed_time.index(exposed_position)));
215 exposed_position += 1;
216 }
217
218 // tidy up times, removing redundancy.
219 sort_dedup(&mut interesting_times);
220
221 // do the per-key computation.
222 let _counters = thinker.compute(
223 key,
224 (&mut source_cursor, source_storage),
225 (&mut output_cursor, output_storage),
226 (&mut batch_cursor, batch_storage),
227 &mut interesting_times,
228 &mut logic,
229 &upper_limit,
230 &mut buffers[..],
231 &mut new_interesting_times,
232 );
233
234 if batch_cursor.get_key(batch_storage) == Some(key) {
235 batch_cursor.step_key(batch_storage);
236 }
237
238 // Record future warnings about interesting times (and assert they should be "future").
239 for time in new_interesting_times.drain(..) {
240 debug_assert!(upper_limit.less_equal(&time));
241 interesting.push((T1::owned_key(key), time));
242 }
243
244 // Sort each buffer by value and move into the corresponding builder.
245 // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
246 // (ii) that the buffers are time-ordered, and (iii) that the builders accept
247 // arbitrarily ordered times.
248 for index in 0 .. buffers.len() {
249 buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
250 for (val, time, diff) in buffers[index].1.drain(..) {
251 buffer.push_into(((T1::owned_key(key), val), time, diff));
252 builders[index].push(&mut buffer);
253 buffer.clear();
254 }
255 }
256 }
257
258 // We start sealing output batches from the lower limit (previous upper limit).
259 // In principle, we could update `lower_limit` itself, and it should arrive at
260 // `upper_limit` by the end of the process.
261 output_lower.clear();
262 output_lower.extend(lower_limit.borrow().iter().cloned());
263
264 // build and ship each batch (because only one capability per message).
265 for (index, builder) in builders.drain(..).enumerate() {
266
267 // Form the upper limit of the next batch, which includes all times greater
268 // than the input batch, or the capabilities from i + 1 onward.
269 output_upper.clear();
270 output_upper.extend(upper_limit.borrow().iter().cloned());
271 for capability in &capabilities[index + 1 ..] {
272 output_upper.insert(capability.time().clone());
273 }
274
275 if output_upper.borrow() != output_lower.borrow() {
276
277 let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(G::Timestamp::minimum()));
278 let batch = builder.done(description);
279
280 // ship batch to the output, and commit to the output trace.
281 output.session(&capabilities[index]).give(batch.clone());
282 output_writer.insert(batch, Some(capabilities[index].time().clone()));
283
284 output_lower.clear();
285 output_lower.extend(output_upper.borrow().iter().cloned());
286 }
287 }
288
289 // This should be true, as the final iteration introduces no capabilities, and
290 // uses exactly `upper_limit` to determine the upper bound. Good to check though.
291 assert!(output_upper.borrow() == upper_limit.borrow());
292
293 // Determine the frontier of our interesting times.
294 let mut frontier = Antichain::<G::Timestamp>::new();
295 for (_, time) in &interesting {
296 frontier.insert_ref(time);
297 }
298
299 // Update `capabilities` to reflect interesting pairs described by `frontier`.
300 let mut new_capabilities = Vec::new();
301 for time in frontier.borrow().iter() {
302 if let Some(cap) = capabilities.iter().find(|c| c.time().less_equal(time)) {
303 new_capabilities.push(cap.delayed(time));
304 }
305 else {
306 println!("{}:\tfailed to find capability less than new frontier time:", id);
307 println!("{}:\t time: {:?}", id, time);
308 println!("{}:\t caps: {:?}", id, capabilities);
309 println!("{}:\t uppr: {:?}", id, upper_limit);
310 }
311 }
312 capabilities = new_capabilities;
313
314 // ensure that observed progress is reflected in the output.
315 output_writer.seal(upper_limit.clone());
316 }
317 else {
318 output_writer.seal(upper_limit.clone());
319 }
320
321 // We only anticipate future times in advance of `upper_limit`.
322 source_trace.set_logical_compaction(upper_limit.borrow());
323 output_reader.set_logical_compaction(upper_limit.borrow());
324
325 // We will only slice the data between future batches.
326 source_trace.set_physical_compaction(upper_limit.borrow());
327 output_reader.set_physical_compaction(upper_limit.borrow());
328 }
329
330 // Exert trace maintenance if we have been so requested.
331 output_writer.exert();
332 }
333 }
334 )
335 };
336
337 Arranged { stream, trace: result_trace.unwrap() }
338}
339
340
341#[inline(never)]
342fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
343 list.dedup();
344 list.sort();
345 list.dedup();
346}
347
348trait PerKeyCompute<'a, C1, C2, C3, V>
349where
350 C1: Cursor,
351 C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
352 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
353 V: Clone + Ord,
354{
355 fn new() -> Self;
356 fn compute<L>(
357 &mut self,
358 key: C1::Key<'a>,
359 source_cursor: (&mut C1, &'a C1::Storage),
360 output_cursor: (&mut C2, &'a C2::Storage),
361 batch_cursor: (&mut C3, &'a C3::Storage),
362 times: &mut Vec<C1::Time>,
363 logic: &mut L,
364 upper_limit: &Antichain<C1::Time>,
365 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
366 new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
367 where
368 L: FnMut(
369 C1::Key<'a>,
370 &[(C1::Val<'a>, C1::Diff)],
371 &mut Vec<(V, C2::Diff)>,
372 &mut Vec<(V, C2::Diff)>,
373 );
374}
375
376
377/// Implementation based on replaying historical and new updates together.
378mod history_replay {
379
380 use timely::progress::Antichain;
381 use timely::PartialOrder;
382
383 use crate::lattice::Lattice;
384 use crate::trace::Cursor;
385 use crate::operators::ValueHistory;
386
387 use super::{PerKeyCompute, sort_dedup};
388
389 /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
390 /// time order, maintaining consolidated representations of updates with respect to future interesting times.
391 pub struct HistoryReplayer<'a, C1, C2, C3, V>
392 where
393 C1: Cursor,
394 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
395 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
396 V: Clone + Ord,
397 {
398 input_history: ValueHistory<'a, C1>,
399 output_history: ValueHistory<'a, C2>,
400 batch_history: ValueHistory<'a, C3>,
401 input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
402 output_buffer: Vec<(V, C2::Diff)>,
403 update_buffer: Vec<(V, C2::Diff)>,
404 output_produced: Vec<((V, C2::Time), C2::Diff)>,
405 synth_times: Vec<C1::Time>,
406 meets: Vec<C1::Time>,
407 times_current: Vec<C1::Time>,
408 temporary: Vec<C1::Time>,
409 }
410
411 impl<'a, C1, C2, C3, V> PerKeyCompute<'a, C1, C2, C3, V> for HistoryReplayer<'a, C1, C2, C3, V>
412 where
413 C1: Cursor,
414 C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
415 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
416 V: Clone + Ord,
417 {
418 fn new() -> Self {
419 HistoryReplayer {
420 input_history: ValueHistory::new(),
421 output_history: ValueHistory::new(),
422 batch_history: ValueHistory::new(),
423 input_buffer: Vec::new(),
424 output_buffer: Vec::new(),
425 update_buffer: Vec::new(),
426 output_produced: Vec::new(),
427 synth_times: Vec::new(),
428 meets: Vec::new(),
429 times_current: Vec::new(),
430 temporary: Vec::new(),
431 }
432 }
433 #[inline(never)]
434 fn compute<L>(
435 &mut self,
436 key: C1::Key<'a>,
437 (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
438 (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
439 (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
440 times: &mut Vec<C1::Time>,
441 logic: &mut L,
442 upper_limit: &Antichain<C1::Time>,
443 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
444 new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
445 where
446 L: FnMut(
447 C1::Key<'a>,
448 &[(C1::Val<'a>, C1::Diff)],
449 &mut Vec<(V, C2::Diff)>,
450 &mut Vec<(V, C2::Diff)>,
451 )
452 {
453
454 // The work we need to perform is at times defined principally by the contents of `batch_cursor`
455 // and `times`, respectively "new work we just received" and "old times we were warned about".
456 //
457 // Our first step is to identify these times, so that we can use them to restrict the amount of
458 // information we need to recover from `input` and `output`; as all times of interest will have
459 // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
460 // loaded times by performing the lattice `join` with this value.
461
462 // Load the batch contents.
463 let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time));
464
465 // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
466 // can be used to advance other historical times, which may consolidate their representation. As
467 // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
468 // history forward.
469
470 self.meets.clear();
471 self.meets.extend(times.iter().cloned());
472 for index in (1 .. self.meets.len()).rev() {
473 self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
474 }
475
476 // Determine the meet of times in `batch` and `times`.
477 let mut meet = None;
478 update_meet(&mut meet, self.meets.get(0));
479 update_meet(&mut meet, batch_replay.meet());
480 // if let Some(time) = self.meets.get(0) {
481 // meet = match meet {
482 // None => Some(self.meets[0].clone()),
483 // Some(x) => Some(x.meet(&self.meets[0])),
484 // };
485 // }
486 // if let Some(time) = batch_replay.meet() {
487 // meet = match meet {
488 // None => Some(time.clone()),
489 // Some(x) => Some(x.meet(&time)),
490 // };
491 // }
492
493 // Having determined the meet, we can load the input and output histories, where we
494 // advance all times by joining them with `meet`. The resulting times are more compact
495 // and guaranteed to accumulate identically for times greater or equal to `meet`.
496
497 // Load the input and output histories.
498 let mut input_replay = if let Some(meet) = meet.as_ref() {
499 self.input_history.replay_key(source_cursor, source_storage, key, |time| {
500 let mut time = C1::owned_time(time);
501 time.join_assign(meet);
502 time
503 })
504 }
505 else {
506 self.input_history.replay_key(source_cursor, source_storage, key, |time| C1::owned_time(time))
507 };
508 let mut output_replay = if let Some(meet) = meet.as_ref() {
509 self.output_history.replay_key(output_cursor, output_storage, key, |time| {
510 let mut time = C2::owned_time(time);
511 time.join_assign(meet);
512 time
513 })
514 }
515 else {
516 self.output_history.replay_key(output_cursor, output_storage, key, |time| C2::owned_time(time))
517 };
518
519 self.synth_times.clear();
520 self.times_current.clear();
521 self.output_produced.clear();
522
523 // The frontier of times we may still consider.
524 // Derived from frontiers of our update histories, supplied times, and synthetic times.
525
526 let mut times_slice = ×[..];
527 let mut meets_slice = &self.meets[..];
528
529 let mut compute_counter = 0;
530 let mut output_counter = 0;
531
532 // We have candidate times from `batch` and `times`, as well as times identified by either
533 // `input` or `output`. Finally, we may have synthetic times produced as the join of times
534 // we consider in the course of evaluation. As long as any of these times exist, we need to
535 // keep examining times.
536 while let Some(next_time) = [ batch_replay.time(),
537 times_slice.first(),
538 input_replay.time(),
539 output_replay.time(),
540 self.synth_times.last(),
541 ].iter().cloned().flatten().min().cloned() {
542
543 // Advance input and output history replayers. This marks applicable updates as active.
544 input_replay.step_while_time_is(&next_time);
545 output_replay.step_while_time_is(&next_time);
546
547 // One of our goals is to determine if `next_time` is "interesting", meaning whether we
548 // have any evidence that we should re-evaluate the user logic at this time. For a time
549 // to be "interesting" it would need to be the join of times that include either a time
550 // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
551
552 // Advance batch history, and capture whether an update exists at `next_time`.
553 let mut interesting = batch_replay.step_while_time_is(&next_time);
554 if interesting {
555 if let Some(meet) = meet.as_ref() {
556 batch_replay.advance_buffer_by(meet);
557 }
558 }
559
560 // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
561 while self.synth_times.last() == Some(&next_time) {
562 // We don't know enough about `next_time` to avoid putting it in to `times_current`.
563 // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
564 self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
565 interesting = true;
566 }
567 while times_slice.first() == Some(&next_time) {
568 // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
569 self.times_current.push(times_slice[0].clone());
570 times_slice = ×_slice[1..];
571 meets_slice = &meets_slice[1..];
572 interesting = true;
573 }
574
575 // Times could also be interesting if an interesting time is less than them, as they would join
576 // and become the time itself. They may not equal the current time because whatever frontier we
577 // are tracking may not have advanced far enough.
578 // TODO: `batch_history` may or may not be super compact at this point, and so this check might
579 // yield false positives if not sufficiently compact. Maybe we should into this and see.
580 interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
581 interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
582
583 // We should only process times that are not in advance of `upper_limit`.
584 //
585 // We have no particular guarantee that known times will not be in advance of `upper_limit`.
586 // We may have the guarantee that synthetic times will not be, as we test against the limit
587 // before we add the time to `synth_times`.
588 if !upper_limit.less_equal(&next_time) {
589
590 // We should re-evaluate the computation if this is an interesting time.
591 // If the time is uninteresting (and our logic is sound) it is not possible for there to be
592 // output produced. This sounds like a good test to have for debug builds!
593 if interesting {
594
595 compute_counter += 1;
596
597 // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
598 debug_assert!(self.input_buffer.is_empty());
599 meet.as_ref().map(|meet| input_replay.advance_buffer_by(meet));
600 for &((value, ref time), ref diff) in input_replay.buffer().iter() {
601 if time.less_equal(&next_time) {
602 self.input_buffer.push((value, diff.clone()));
603 }
604 else {
605 self.temporary.push(next_time.join(time));
606 }
607 }
608 for &((value, ref time), ref diff) in batch_replay.buffer().iter() {
609 if time.less_equal(&next_time) {
610 self.input_buffer.push((value, diff.clone()));
611 }
612 else {
613 self.temporary.push(next_time.join(time));
614 }
615 }
616 crate::consolidation::consolidate(&mut self.input_buffer);
617
618 meet.as_ref().map(|meet| output_replay.advance_buffer_by(meet));
619 for &((value, ref time), ref diff) in output_replay.buffer().iter() {
620 if time.less_equal(&next_time) {
621 self.output_buffer.push((C2::owned_val(value), diff.clone()));
622 }
623 else {
624 self.temporary.push(next_time.join(time));
625 }
626 }
627 for &((ref value, ref time), ref diff) in self.output_produced.iter() {
628 if time.less_equal(&next_time) {
629 self.output_buffer.push(((*value).to_owned(), diff.clone()));
630 }
631 else {
632 self.temporary.push(next_time.join(time));
633 }
634 }
635 crate::consolidation::consolidate(&mut self.output_buffer);
636
637 // Apply user logic if non-empty input and see what happens!
638 if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
639 logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
640 self.input_buffer.clear();
641 self.output_buffer.clear();
642 }
643
644 // output_replay.advance_buffer_by(&meet);
645 // for &((ref value, ref time), diff) in output_replay.buffer().iter() {
646 // if time.less_equal(&next_time) {
647 // self.output_buffer.push(((*value).clone(), -diff));
648 // }
649 // else {
650 // self.temporary.push(next_time.join(time));
651 // }
652 // }
653 // for &((ref value, ref time), diff) in self.output_produced.iter() {
654 // if time.less_equal(&next_time) {
655 // self.output_buffer.push(((*value).clone(), -diff));
656 // }
657 // else {
658 // self.temporary.push(next_time.join(&time));
659 // }
660 // }
661
662 // Having subtracted output updates from user output, consolidate the results to determine
663 // if there is anything worth reporting. Note: this also orders the results by value, so
664 // that could make the above merging plan even easier.
665 crate::consolidation::consolidate(&mut self.update_buffer);
666
667 // Stash produced updates into both capability-indexed buffers and `output_produced`.
668 // The two locations are important, in that we will compact `output_produced` as we move
669 // through times, but we cannot compact the output buffers because we need their actual
670 // times.
671 if !self.update_buffer.is_empty() {
672
673 output_counter += 1;
674
675 // We *should* be able to find a capability for `next_time`. Any thing else would
676 // indicate a logical error somewhere along the way; either we release a capability
677 // we should have kept, or we have computed the output incorrectly (or both!)
678 let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
679 let idx = outputs.len() - idx.expect("failed to find index") - 1;
680 for (val, diff) in self.update_buffer.drain(..) {
681 self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
682 outputs[idx].1.push((val, next_time.clone(), diff));
683 }
684
685 // Advance times in `self.output_produced` and consolidate the representation.
686 // NOTE: We only do this when we add records; it could be that there are situations
687 // where we want to consolidate even without changes (because an initially
688 // large collection can now be collapsed).
689 if let Some(meet) = meet.as_ref() {
690 for entry in &mut self.output_produced {
691 (entry.0).1 = (entry.0).1.join(meet);
692 }
693 }
694 crate::consolidation::consolidate(&mut self.output_produced);
695 }
696 }
697
698 // Determine synthetic interesting times.
699 //
700 // Synthetic interesting times are produced differently for interesting and uninteresting
701 // times. An uninteresting time must join with an interesting time to become interesting,
702 // which means joins with `self.batch_history` and `self.times_current`. I think we can
703 // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
704 // joined against everything.
705
706 // Any time, even uninteresting times, must be joined with the current accumulation of
707 // batch times as well as the current accumulation of `times_current`.
708 for &((_, ref time), _) in batch_replay.buffer().iter() {
709 if !time.less_equal(&next_time) {
710 self.temporary.push(time.join(&next_time));
711 }
712 }
713 for time in self.times_current.iter() {
714 if !time.less_equal(&next_time) {
715 self.temporary.push(time.join(&next_time));
716 }
717 }
718
719 sort_dedup(&mut self.temporary);
720
721 // Introduce synthetic times, and re-organize if we add any.
722 let synth_len = self.synth_times.len();
723 for time in self.temporary.drain(..) {
724 // We can either service `join` now, or must delay for the future.
725 if upper_limit.less_equal(&time) {
726 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
727 new_interesting.push(time);
728 }
729 else {
730 self.synth_times.push(time);
731 }
732 }
733 if self.synth_times.len() > synth_len {
734 self.synth_times.sort_by(|x,y| y.cmp(x));
735 self.synth_times.dedup();
736 }
737 }
738 else if interesting {
739 // We cannot process `next_time` now, and must delay it.
740 //
741 // I think we are probably only here because of an uninteresting time declared interesting,
742 // as initial interesting times are filtered to be in interval, and synthetic times are also
743 // filtered before introducing them to `self.synth_times`.
744 new_interesting.push(next_time.clone());
745 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
746 }
747
748
749 // Update `meet` to track the meet of each source of times.
750 meet = None;//T::maximum();
751 update_meet(&mut meet, batch_replay.meet());
752 update_meet(&mut meet, input_replay.meet());
753 update_meet(&mut meet, output_replay.meet());
754 for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
755 // if let Some(time) = batch_replay.meet() { meet = meet.meet(time); }
756 // if let Some(time) = input_replay.meet() { meet = meet.meet(time); }
757 // if let Some(time) = output_replay.meet() { meet = meet.meet(time); }
758 // for time in self.synth_times.iter() { meet = meet.meet(time); }
759 update_meet(&mut meet, meets_slice.first());
760 // if let Some(time) = meets_slice.first() { meet = meet.meet(time); }
761
762 // Update `times_current` by the frontier.
763 if let Some(meet) = meet.as_ref() {
764 for time in self.times_current.iter_mut() {
765 *time = time.join(meet);
766 }
767 }
768
769 sort_dedup(&mut self.times_current);
770 }
771
772 // Normalize the representation of `new_interesting`, deduplicating and ordering.
773 sort_dedup(new_interesting);
774
775 (compute_counter, output_counter)
776 }
777 }
778
779 /// Updates an optional meet by an optional time.
780 fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
781 if let Some(time) = other {
782 if let Some(meet) = meet.as_mut() {
783 *meet = meet.meet(time);
784 }
785 if meet.is_none() {
786 *meet = Some(time.clone());
787 }
788 }
789 }
790}