differential_dataflow/operators/reduce.rs
1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use crate::Data;
9
10use timely::progress::frontier::Antichain;
11use timely::progress::Timestamp;
12use timely::dataflow::operators::Operator;
13use timely::dataflow::channels::pact::Pipeline;
14
15use crate::operators::arrange::{Arranged, TraceAgent};
16use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
17use crate::trace::cursor::CursorList;
18use crate::trace::implementations::containers::BatchContainer;
19use crate::trace::TraceReader;
20
21/// A key-wise reduction of values in an input trace.
22///
23/// This method exists to provide reduce functionality without opinions about qualifying trace types.
24///
25/// The `logic` closure is expected to take a key, accumulated input, and tentative accumulated output,
26/// and populate its final argument with whatever it feels to be appopriate updates. The behavior and
27/// correctness of the implementation rely on this making sense, and e.g. ideally the updates would if
28/// applied to the tentative output bring it in line with some function applied to the input.
29///
30/// The `push` closure is expected to clear its first argument, then populate it with the key and drain
31/// the value updates, as appropriate for the container. It is critical that it clear the container as
32/// the operator has no ability to do this otherwise, and failing to do so represents a leak from one
33/// key's computation to another, and will likely introduce non-determinism.
34pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent<Tr2>>
35where
36 Tr1: TraceReader + Clone + 'static,
37 Tr2: for<'a> Trace<Key<'a>=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static,
38 Bu: Builder<Time=Tr2::Time, Output = Tr2::Batch, Input: Default>,
39 L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
40 P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
41{
42 let mut result_trace = None;
43
44 // fabricate a data-parallel operator using the `unary_notify` pattern.
45 let stream = {
46
47 let result_trace = &mut result_trace;
48 let scope = trace.stream.scope();
49 trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
50
51 // Acquire a logger for arrange events.
52 let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
53
54 let activator = Some(scope.activator_for(operator_info.address.clone()));
55 let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator);
56 // If there is default exert logic set, install it.
57 if let Some(exert_logic) = scope.worker().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
58 empty.set_exert_logic(exert_logic);
59 }
60
61 let mut source_trace = trace.trace.clone();
62
63 let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
64
65 *result_trace = Some(output_reader.clone());
66
67 let mut new_interesting_times = Vec::<Tr1::Time>::new();
68
69 // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
70 // sorted by (key, time), as well as capabilities for the lower envelope of the times.
71 let mut pending_keys = Tr1::KeyContainer::with_capacity(0);
72 let mut pending_time = Tr1::TimeContainer::with_capacity(0);
73 let mut next_pending_keys = Tr1::KeyContainer::with_capacity(0);
74 let mut next_pending_time = Tr1::TimeContainer::with_capacity(0);
75 let mut capabilities = timely::dataflow::operators::CapabilitySet::<Tr1::Time>::default();
76
77 // buffers and logic for computing per-key interesting times "efficiently".
78 let mut interesting_times = Vec::<Tr1::Time>::new();
79
80 // Upper and lower frontiers for the pending input and output batches to process.
81 let mut upper_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
82 let mut lower_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
83
84 // Output batches may need to be built piecemeal, and these temp storage help there.
85 let mut output_upper = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
86 let mut output_lower = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
87
88 move |(input, frontier), output| {
89
90 // The operator receives input batches, which it treats as contiguous and will collect and
91 // then process as one batch. It captures the input frontier from the batches, from the upstream
92 // trace, and from the input frontier, and retires the work through that interval.
93 //
94 // Reduce may retain capabilities and need to perform work and produce output at times that
95 // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)`
96 // may result in outputs at `(1, 1)` as well, even with no input at that time.
97
98 let mut batch_cursors = Vec::new();
99 let mut batch_storage = Vec::new();
100
101 // Downgrade previous upper limit to be current lower limit.
102 lower_limit.clear();
103 lower_limit.extend(upper_limit.borrow().iter().cloned());
104
105 // Drain input batches in order, capturing capabilities and the last upper.
106 input.for_each(|capability, batches| {
107 capabilities.insert(capability.retain(0));
108 for batch in batches.drain(..) {
109 upper_limit.clone_from(batch.upper());
110 batch_cursors.push(batch.cursor());
111 batch_storage.push(batch);
112 }
113 });
114
115 // Pull in any subsequent empty batches we believe to exist.
116 source_trace.advance_upper(&mut upper_limit);
117 // Incorporate the input frontier guarantees as well.
118 let mut joined = Antichain::new();
119 crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined);
120 upper_limit = joined;
121
122 // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed.
123 if upper_limit != lower_limit {
124
125 // If we hold no capabilities in the interval [lower_limit, upper_limit) then we have no compute needs,
126 // and could not transmit the outputs even if they were (incorrectly) non-zero.
127 // We do have maintenance work after this logic, and should not fuse this test with the above test.
128 if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
129
130 // cursors for navigating input and output traces.
131 let (mut source_cursor, ref source_storage): (Tr1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
132 let (mut output_cursor, ref output_storage): (Tr2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
133 let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
134
135 // Prepare an output buffer and builder for each capability.
136 // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
137 // this as long as it requires that there is only one capability for each message.
138 let mut buffers = Vec::<(Tr1::Time, Vec<(Tr2::ValOwn, Tr1::Time, Tr2::Diff)>)>::new();
139 let mut builders = Vec::new();
140 for cap in capabilities.iter() {
141 buffers.push((cap.time().clone(), Vec::new()));
142 builders.push(Bu::new());
143 }
144 // Temporary staging for output building.
145 let mut buffer = Bu::Input::default();
146
147 // Reuseable state for performing the computation.
148 let mut thinker = history_replay::HistoryReplayer::new();
149
150 // Merge the received batch cursor with our list of interesting (key, time) moments.
151 // The interesting moments need to be in the interval to prompt work.
152
153 // March through the keys we must work on, merging `batch_cursors` and `exposed`.
154 let mut pending_pos = 0;
155 while batch_cursor.key_valid(batch_storage) || pending_pos < pending_keys.len() {
156
157 // Determine the next key we will work on; could be synthetic, could be from a batch.
158 let key1 = pending_keys.get(pending_pos);
159 let key2 = batch_cursor.get_key(batch_storage);
160 let key = match (key1, key2) {
161 (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
162 (Some(key1), None) => key1,
163 (None, Some(key2)) => key2,
164 (None, None) => unreachable!(),
165 };
166
167 // Populate `interesting_times` with interesting times not beyond `upper_limit`.
168 // TODO: This could just be `pending_time` and indexes within `lower .. upper`.
169 let prior_pos = pending_pos;
170 interesting_times.clear();
171 while pending_keys.get(pending_pos) == Some(key) {
172 let owned_time = Tr1::owned_time(pending_time.index(pending_pos));
173 if !upper_limit.less_equal(&owned_time) { interesting_times.push(owned_time); }
174 pending_pos += 1;
175 }
176
177 // tidy up times, removing redundancy.
178 sort_dedup(&mut interesting_times);
179
180 // If there are new updates, or pending times, we must investigate!
181 if batch_cursor.get_key(batch_storage) == Some(key) || !interesting_times.is_empty() {
182
183 // do the per-key computation.
184 thinker.compute(
185 key,
186 (&mut source_cursor, source_storage),
187 (&mut output_cursor, output_storage),
188 (&mut batch_cursor, batch_storage),
189 &interesting_times,
190 &mut logic,
191 &upper_limit,
192 &mut buffers[..],
193 &mut new_interesting_times,
194 );
195
196 // Advance the cursor if this key, so that the loop's validity check registers the work as done.
197 if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); }
198
199 // Merge novel pending times with any prior pending times we did not process.
200 // TODO: This could be a merge, not a sort_dedup, because both lists should be sorted.
201 for pos in prior_pos .. pending_pos {
202 let owned_time = Tr1::owned_time(pending_time.index(pos));
203 if upper_limit.less_equal(&owned_time) { new_interesting_times.push(owned_time); }
204 }
205 sort_dedup(&mut new_interesting_times);
206 for time in new_interesting_times.drain(..) {
207 next_pending_keys.push_ref(key);
208 next_pending_time.push_own(&time);
209 }
210
211 // Sort each buffer by value and move into the corresponding builder.
212 // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
213 // (ii) that the buffers are time-ordered, and (iii) that the builders accept
214 // arbitrarily ordered times.
215 for index in 0 .. buffers.len() {
216 buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
217 push(&mut buffer, key, &mut buffers[index].1);
218 buffers[index].1.clear();
219 builders[index].push(&mut buffer);
220
221 }
222 }
223 else {
224 // copy over the pending key and times.
225 for pos in prior_pos .. pending_pos {
226 next_pending_keys.push_ref(pending_keys.index(pos));
227 next_pending_time.push_ref(pending_time.index(pos));
228 }
229 }
230 }
231 // Drop to avoid lifetime issues that would lock `pending_{keys, time}`.
232 drop(thinker);
233
234 // We start sealing output batches from the lower limit (previous upper limit).
235 // In principle, we could update `lower_limit` itself, and it should arrive at
236 // `upper_limit` by the end of the process.
237 output_lower.clear();
238 output_lower.extend(lower_limit.borrow().iter().cloned());
239
240 // build and ship each batch (because only one capability per message).
241 for (index, builder) in builders.drain(..).enumerate() {
242
243 // Form the upper limit of the next batch, which includes all times greater
244 // than the input batch, or the capabilities from i + 1 onward.
245 output_upper.clear();
246 output_upper.extend(upper_limit.borrow().iter().cloned());
247 for capability in &capabilities[index + 1 ..] {
248 output_upper.insert_ref(capability.time());
249 }
250
251 if output_upper.borrow() != output_lower.borrow() {
252
253 let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(Tr1::Time::minimum()));
254 let batch = builder.done(description);
255
256 // ship batch to the output, and commit to the output trace.
257 output.session(&capabilities[index]).give(batch.clone());
258 output_writer.insert(batch, Some(capabilities[index].time().clone()));
259
260 output_lower.clear();
261 output_lower.extend(output_upper.borrow().iter().cloned());
262 }
263 }
264 // This should be true, as the final iteration introduces no capabilities, and
265 // uses exactly `upper_limit` to determine the upper bound. Good to check though.
266 assert!(output_upper.borrow() == upper_limit.borrow());
267
268 // Refresh pending keys and times, then downgrade capabilities to the frontier of times.
269 pending_keys.clear(); std::mem::swap(&mut next_pending_keys, &mut pending_keys);
270 pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);
271
272 // Update `capabilities` to reflect pending times.
273 let mut frontier = Antichain::<Tr1::Time>::new();
274 let mut owned_time = Tr1::Time::minimum();
275 for pos in 0 .. pending_time.len() {
276 Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
277 frontier.insert_ref(&owned_time);
278 }
279 capabilities.downgrade(frontier);
280 }
281
282 // ensure that observed progress is reflected in the output.
283 output_writer.seal(upper_limit.clone());
284
285 // We only anticipate future times in advance of `upper_limit`.
286 source_trace.set_logical_compaction(upper_limit.borrow());
287 output_reader.set_logical_compaction(upper_limit.borrow());
288
289 // We will only slice the data between future batches.
290 source_trace.set_physical_compaction(upper_limit.borrow());
291 output_reader.set_physical_compaction(upper_limit.borrow());
292 }
293
294 // Exert trace maintenance if we have been so requested.
295 output_writer.exert();
296 }
297 }
298 )
299 };
300
301 Arranged { stream, trace: result_trace.unwrap() }
302}
303
304
305#[inline(never)]
306fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
307 list.dedup();
308 list.sort();
309 list.dedup();
310}
311
312/// Implementation based on replaying historical and new updates together.
313mod history_replay {
314
315 use timely::progress::Antichain;
316 use timely::PartialOrder;
317
318 use crate::lattice::Lattice;
319 use crate::trace::Cursor;
320 use crate::operators::ValueHistory;
321
322 use super::sort_dedup;
323
324 /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
325 /// time order, maintaining consolidated representations of updates with respect to future interesting times.
326 pub struct HistoryReplayer<'a, C1, C2, C3, V>
327 where
328 C1: Cursor,
329 C2: Cursor<Key<'a> = C1::Key<'a>, Time = C1::Time>,
330 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
331 V: Clone + Ord,
332 {
333 input_history: ValueHistory<'a, C1>,
334 output_history: ValueHistory<'a, C2>,
335 batch_history: ValueHistory<'a, C3>,
336 input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
337 output_buffer: Vec<(V, C2::Diff)>,
338 update_buffer: Vec<(V, C2::Diff)>,
339 output_produced: Vec<((V, C2::Time), C2::Diff)>,
340 synth_times: Vec<C1::Time>,
341 meets: Vec<C1::Time>,
342 times_current: Vec<C1::Time>,
343 temporary: Vec<C1::Time>,
344 }
345
346 impl<'a, C1, C2, C3, V> HistoryReplayer<'a, C1, C2, C3, V>
347 where
348 C1: Cursor,
349 C2: for<'b> Cursor<Key<'a> = C1::Key<'a>, ValOwn = V, Time = C1::Time>,
350 C3: Cursor<Key<'a> = C1::Key<'a>, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
351 V: Clone + Ord,
352 {
353 pub fn new() -> Self {
354 HistoryReplayer {
355 input_history: ValueHistory::new(),
356 output_history: ValueHistory::new(),
357 batch_history: ValueHistory::new(),
358 input_buffer: Vec::new(),
359 output_buffer: Vec::new(),
360 update_buffer: Vec::new(),
361 output_produced: Vec::new(),
362 synth_times: Vec::new(),
363 meets: Vec::new(),
364 times_current: Vec::new(),
365 temporary: Vec::new(),
366 }
367 }
368 #[inline(never)]
369 pub fn compute<L>(
370 &mut self,
371 key: C1::Key<'a>,
372 (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
373 (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
374 (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
375 times: &Vec<C1::Time>,
376 logic: &mut L,
377 upper_limit: &Antichain<C1::Time>,
378 outputs: &mut [(C2::Time, Vec<(V, C2::Time, C2::Diff)>)],
379 new_interesting: &mut Vec<C1::Time>)
380 where
381 L: FnMut(
382 C1::Key<'a>,
383 &[(C1::Val<'a>, C1::Diff)],
384 &mut Vec<(V, C2::Diff)>,
385 &mut Vec<(V, C2::Diff)>,
386 )
387 {
388
389 // The work we need to perform is at times defined principally by the contents of `batch_cursor`
390 // and `times`, respectively "new work we just received" and "old times we were warned about".
391 //
392 // Our first step is to identify these times, so that we can use them to restrict the amount of
393 // information we need to recover from `input` and `output`; as all times of interest will have
394 // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
395 // loaded times by performing the lattice `join` with this value.
396
397 // Load the batch contents.
398 let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, |time| C3::owned_time(time));
399
400 // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
401 // can be used to advance other historical times, which may consolidate their representation. As
402 // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
403 // history forward.
404
405 self.meets.clear();
406 self.meets.extend(times.iter().cloned());
407 for index in (1 .. self.meets.len()).rev() {
408 self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
409 }
410
411 // Determine the meet of times in `batch` and `times`.
412 let mut meet = None;
413 update_meet(&mut meet, self.meets.get(0));
414 update_meet(&mut meet, batch_replay.meet());
415
416 // Having determined the meet, we can load the input and output histories, where we
417 // advance all times by joining them with `meet`. The resulting times are more compact
418 // and guaranteed to accumulate identically for times greater or equal to `meet`.
419
420 // Load the input and output histories.
421 let mut input_replay =
422 self.input_history.replay_key(source_cursor, source_storage, key, |time| {
423 let mut time = C1::owned_time(time);
424 if let Some(meet) = meet.as_ref() { time.join_assign(meet); }
425 time
426 });
427 let mut output_replay =
428 self.output_history.replay_key(output_cursor, output_storage, key, |time| {
429 let mut time = C2::owned_time(time);
430 if let Some(meet) = meet.as_ref() { time.join_assign(meet); }
431 time
432 });
433
434 self.synth_times.clear();
435 self.times_current.clear();
436 self.output_produced.clear();
437
438 // The frontier of times we may still consider.
439 // Derived from frontiers of our update histories, supplied times, and synthetic times.
440
441 let mut times_slice = ×[..];
442 let mut meets_slice = &self.meets[..];
443
444 // We have candidate times from `batch` and `times`, as well as times identified by either
445 // `input` or `output`. Finally, we may have synthetic times produced as the join of times
446 // we consider in the course of evaluation. As long as any of these times exist, we need to
447 // keep examining times.
448 while let Some(next_time) = [ batch_replay.time(),
449 times_slice.first(),
450 input_replay.time(),
451 output_replay.time(),
452 self.synth_times.last(),
453 ].into_iter().flatten().min().cloned() {
454
455 // Advance input and output history replayers. This marks applicable updates as active.
456 input_replay.step_while_time_is(&next_time);
457 output_replay.step_while_time_is(&next_time);
458
459 // One of our goals is to determine if `next_time` is "interesting", meaning whether we
460 // have any evidence that we should re-evaluate the user logic at this time. For a time
461 // to be "interesting" it would need to be the join of times that include either a time
462 // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
463
464 // Advance batch history, and capture whether an update exists at `next_time`.
465 let mut interesting = batch_replay.step_while_time_is(&next_time);
466 if interesting { if let Some(meet) = meet.as_ref() { batch_replay.advance_buffer_by(meet); } }
467
468 // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
469 while self.synth_times.last() == Some(&next_time) {
470 // We don't know enough about `next_time` to avoid putting it in to `times_current`.
471 // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
472 self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
473 interesting = true;
474 }
475 while times_slice.first() == Some(&next_time) {
476 // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
477 self.times_current.push(times_slice[0].clone());
478 times_slice = ×_slice[1..];
479 meets_slice = &meets_slice[1..];
480 interesting = true;
481 }
482
483 // Times could also be interesting if an interesting time is less than them, as they would join
484 // and become the time itself. They may not equal the current time because whatever frontier we
485 // are tracking may not have advanced far enough.
486 // TODO: `batch_history` may or may not be super compact at this point, and so this check might
487 // yield false positives if not sufficiently compact. Maybe we should look into this and see.
488 interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
489 interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
490
491 // We should only process times that are not in advance of `upper_limit`.
492 //
493 // We have no particular guarantee that known times will not be in advance of `upper_limit`.
494 // We may have the guarantee that synthetic times will not be, as we test against the limit
495 // before we add the time to `synth_times`.
496 if !upper_limit.less_equal(&next_time) {
497
498 // We should re-evaluate the computation if this is an interesting time.
499 // If the time is uninteresting (and our logic is sound) it is not possible for there to be
500 // output produced. This sounds like a good test to have for debug builds!
501 if interesting {
502
503 // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
504 debug_assert!(self.input_buffer.is_empty());
505 if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) };
506 for ((value, time), diff) in input_replay.buffer().iter() {
507 if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
508 else { self.temporary.push(next_time.join(time)); }
509 }
510 for ((value, time), diff) in batch_replay.buffer().iter() {
511 if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
512 else { self.temporary.push(next_time.join(time)); }
513 }
514 crate::consolidation::consolidate(&mut self.input_buffer);
515
516 // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use).
517 if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) };
518 for ((value, time), diff) in output_replay.buffer().iter() {
519 if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); }
520 else { self.temporary.push(next_time.join(time)); }
521 }
522 for ((value, time), diff) in self.output_produced.iter() {
523 if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); }
524 else { self.temporary.push(next_time.join(time)); }
525 }
526 crate::consolidation::consolidate(&mut self.output_buffer);
527
528 // Apply user logic if non-empty input or output and see what happens!
529 if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
530 logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
531 self.input_buffer.clear();
532 self.output_buffer.clear();
533
534 // Having subtracted output updates from user output, consolidate the results to determine
535 // if there is anything worth reporting. Note: this also orders the results by value, so
536 // that could make the above merging plan even easier.
537 //
538 // Stash produced updates into both capability-indexed buffers and `output_produced`.
539 // The two locations are important, in that we will compact `output_produced` as we move
540 // through times, but we cannot compact the output buffers because we need their actual
541 // times.
542 crate::consolidation::consolidate(&mut self.update_buffer);
543 if !self.update_buffer.is_empty() {
544
545 // We *should* be able to find a capability for `next_time`. Any thing else would
546 // indicate a logical error somewhere along the way; either we release a capability
547 // we should have kept, or we have computed the output incorrectly (or both!)
548 let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
549 let idx = outputs.len() - idx.expect("failed to find index") - 1;
550 for (val, diff) in self.update_buffer.drain(..) {
551 self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
552 outputs[idx].1.push((val, next_time.clone(), diff));
553 }
554
555 // Advance times in `self.output_produced` and consolidate the representation.
556 // NOTE: We only do this when we add records; it could be that there are situations
557 // where we want to consolidate even without changes (because an initially
558 // large collection can now be collapsed).
559 if let Some(meet) = meet.as_ref() { for entry in &mut self.output_produced { (entry.0).1.join_assign(meet); } }
560 crate::consolidation::consolidate(&mut self.output_produced);
561 }
562 }
563 }
564
565 // Determine synthetic interesting times.
566 //
567 // Synthetic interesting times are produced differently for interesting and uninteresting
568 // times. An uninteresting time must join with an interesting time to become interesting,
569 // which means joins with `self.batch_history` and `self.times_current`. I think we can
570 // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
571 // joined against everything.
572
573 // Any time, even uninteresting times, must be joined with the current accumulation of
574 // batch times as well as the current accumulation of `times_current`.
575 self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
576 self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
577 sort_dedup(&mut self.temporary);
578
579 // Introduce synthetic times, and re-organize if we add any.
580 let synth_len = self.synth_times.len();
581 for time in self.temporary.drain(..) {
582 // We can either service `join` now, or must delay for the future.
583 if upper_limit.less_equal(&time) {
584 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
585 new_interesting.push(time);
586 }
587 else {
588 self.synth_times.push(time);
589 }
590 }
591 if self.synth_times.len() > synth_len {
592 self.synth_times.sort_by(|x,y| y.cmp(x));
593 self.synth_times.dedup();
594 }
595 }
596 else if interesting {
597 // We cannot process `next_time` now, and must delay it.
598 //
599 // I think we are probably only here because of an uninteresting time declared interesting,
600 // as initial interesting times are filtered to be in interval, and synthetic times are also
601 // filtered before introducing them to `self.synth_times`.
602 new_interesting.push(next_time.clone());
603 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
604 }
605
606 // Update `meet` to track the meet of each source of times.
607 meet = None;
608 update_meet(&mut meet, batch_replay.meet());
609 update_meet(&mut meet, input_replay.meet());
610 update_meet(&mut meet, output_replay.meet());
611 for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
612 update_meet(&mut meet, meets_slice.first());
613
614 // Update `times_current` by the frontier.
615 if let Some(meet) = meet.as_ref() {
616 for time in self.times_current.iter_mut() {
617 *time = time.join(meet);
618 }
619 }
620
621 sort_dedup(&mut self.times_current);
622 }
623
624 // Normalize the representation of `new_interesting`, deduplicating and ordering.
625 sort_dedup(new_interesting);
626 }
627 }
628
629 /// Updates an optional meet by an optional time.
630 fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
631 if let Some(time) = other {
632 if let Some(meet) = meet.as_mut() { meet.meet_assign(time); }
633 else { *meet = Some(time.clone()); }
634 }
635 }
636}