differential_dataflow/operators/reduce.rs
1//! Applies a reduction function on records grouped by key.
2//!
3//! The `reduce` operator acts on `(key, val)` data.
4//! Records with the same key are grouped together, and a user-supplied reduction function is applied
5//! to the key and the list of values.
6//! The function is expected to populate a list of output values.
7
8use crate::Data;
9
10use timely::progress::frontier::Antichain;
11use timely::progress::Timestamp;
12use timely::dataflow::operators::Operator;
13use timely::dataflow::channels::pact::Pipeline;
14
15use crate::operators::arrange::{Arranged, TraceAgent};
16use crate::trace::{BatchReader, Cursor, Trace, Builder, ExertionLogic, Description};
17use crate::trace::cursor::CursorList;
18use crate::trace::implementations::containers::BatchContainer;
19use crate::trace::TraceReader;
20
21/// A key-wise reduction of values in an input trace.
22///
23/// This method exists to provide reduce functionality without opinions about qualifying trace types.
24///
25/// The `logic` closure is expected to take a key, accumulated input, and tentative accumulated output,
26/// and populate its final argument with whatever it feels to be appopriate updates. The behavior and
27/// correctness of the implementation rely on this making sense, and e.g. ideally the updates would if
28/// applied to the tentative output bring it in line with some function applied to the input.
29///
30/// The `push` closure is expected to clear its first argument, then populate it with the key and drain
31/// the value updates, as appropriate for the container. It is critical that it clear the container as
32/// the operator has no ability to do this otherwise, and failing to do so represents a leak from one
33/// key's computation to another, and will likely introduce non-determinism.
34pub fn reduce_trace<'scope, Tr1, Bu, Tr2, L, P>(trace: Arranged<'scope, Tr1>, name: &str, mut logic: L, mut push: P) -> Arranged<'scope, TraceAgent<Tr2>>
35where
36 Tr1: TraceReader + 'static,
37 Tr2: for<'a> Trace<Key<'a>=Tr1::Key<'a>, ValOwn: Data, Time = Tr1::Time> + 'static,
38 Bu: Builder<Time=Tr2::Time, Output = Tr2::Batch, Input: Default>,
39 L: FnMut(Tr1::Key<'_>, &[(Tr1::Val<'_>, Tr1::Diff)], &mut Vec<(Tr2::ValOwn,Tr2::Diff)>, &mut Vec<(Tr2::ValOwn, Tr2::Diff)>)+'static,
40 P: FnMut(&mut Bu::Input, Tr1::Key<'_>, &mut Vec<(Tr2::ValOwn, Tr2::Time, Tr2::Diff)>) + 'static,
41{
42 let mut result_trace = None;
43
44 // fabricate a data-parallel operator using the `unary_notify` pattern.
45 let stream = {
46
47 let mut source_trace = trace.trace;
48 let result_trace = &mut result_trace;
49 let scope = trace.stream.scope();
50 trace.stream.unary_frontier(Pipeline, name, move |_capability, operator_info| {
51
52 // Acquire a logger for arrange events.
53 let logger = scope.worker().logger_for::<crate::logging::DifferentialEventBuilder>("differential/arrange").map(Into::into);
54
55 let activator = Some(scope.activator_for(std::rc::Rc::clone(&operator_info.address)));
56 let mut empty = Tr2::new(operator_info.clone(), logger.clone(), activator);
57 // If there is default exert logic set, install it.
58 if let Some(exert_logic) = scope.worker().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
59 empty.set_exert_logic(exert_logic);
60 }
61
62 let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
63
64 *result_trace = Some(output_reader.clone());
65
66 let mut new_interesting_times = Vec::<Tr1::Time>::new();
67
68 // Our implementation maintains a list of outstanding `(key, time)` synthetic interesting times,
69 // sorted by (key, time), as well as capabilities for the lower envelope of the times.
70 let mut pending_keys = Tr1::KeyContainer::with_capacity(0);
71 let mut pending_time = Tr1::TimeContainer::with_capacity(0);
72 let mut next_pending_keys = Tr1::KeyContainer::with_capacity(0);
73 let mut next_pending_time = Tr1::TimeContainer::with_capacity(0);
74 let mut capabilities = timely::dataflow::operators::CapabilitySet::<Tr1::Time>::default();
75
76 // buffers and logic for computing per-key interesting times "efficiently".
77 let mut interesting_times = Vec::<Tr1::Time>::new();
78
79 // Upper and lower frontiers for the pending input and output batches to process.
80 let mut upper_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
81 let mut lower_limit = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
82
83 // Output batches may need to be built piecemeal, and these temp storage help there.
84 let mut output_upper = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
85 let mut output_lower = Antichain::from_elem(<Tr1::Time as timely::progress::Timestamp>::minimum());
86
87 move |(input, frontier), output| {
88
89 // The operator receives input batches, which it treats as contiguous and will collect and
90 // then process as one batch. It captures the input frontier from the batches, from the upstream
91 // trace, and from the input frontier, and retires the work through that interval.
92 //
93 // Reduce may retain capabilities and need to perform work and produce output at times that
94 // may not be seen in its input. The standard example is that updates at `(0, 1)` and `(1, 0)`
95 // may result in outputs at `(1, 1)` as well, even with no input at that time.
96
97 let mut batch_cursors = Vec::new();
98 let mut batch_storage = Vec::new();
99
100 // Downgrade previous upper limit to be current lower limit.
101 lower_limit.clear();
102 lower_limit.extend(upper_limit.borrow().iter().cloned());
103
104 // Drain input batches in order, capturing capabilities and the last upper.
105 input.for_each(|capability, batches| {
106 capabilities.insert(capability.retain(0));
107 for batch in batches.drain(..) {
108 upper_limit.clone_from(batch.upper());
109 batch_cursors.push(batch.cursor());
110 batch_storage.push(batch);
111 }
112 });
113
114 // Pull in any subsequent empty batches we believe to exist.
115 source_trace.advance_upper(&mut upper_limit);
116 // Incorporate the input frontier guarantees as well.
117 let mut joined = Antichain::new();
118 crate::lattice::antichain_join_into(&upper_limit.borrow()[..], &frontier.frontier()[..], &mut joined);
119 upper_limit = joined;
120
121 // We plan to retire the interval [lower_limit, upper_limit), which should be non-empty to proceed.
122 if upper_limit != lower_limit {
123
124 // If we hold no capabilities in the interval [lower_limit, upper_limit) then we have no compute needs,
125 // and could not transmit the outputs even if they were (incorrectly) non-zero.
126 // We do have maintenance work after this logic, and should not fuse this test with the above test.
127 if capabilities.iter().any(|c| !upper_limit.less_equal(c.time())) {
128
129 // cursors for navigating input and output traces.
130 let (mut source_cursor, ref source_storage): (Tr1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor");
131 let (mut output_cursor, ref output_storage): (Tr2::Cursor, _) = output_reader.cursor_through(lower_limit.borrow()).expect("failed to acquire output cursor");
132 let (mut batch_cursor, ref batch_storage) = (CursorList::new(batch_cursors, &batch_storage), batch_storage);
133
134 // Prepare an output buffer and builder for each capability.
135 // TODO: It would be better if all updates went into one batch, but timely dataflow prevents
136 // this as long as it requires that there is only one capability for each message.
137 let mut buffers = Vec::<(Tr1::Time, Vec<(Tr2::ValOwn, Tr1::Time, Tr2::Diff)>)>::new();
138 let mut builders = Vec::new();
139 for cap in capabilities.iter() {
140 buffers.push((cap.time().clone(), Vec::new()));
141 builders.push(Bu::new());
142 }
143 // Temporary staging for output building.
144 let mut buffer = Bu::Input::default();
145
146 // Reuseable state for performing the computation.
147 let mut thinker = history_replay::HistoryReplayer::new();
148
149 // Merge the received batch cursor with our list of interesting (key, time) moments.
150 // The interesting moments need to be in the interval to prompt work.
151
152 // March through the keys we must work on, merging `batch_cursors` and `exposed`.
153 let mut pending_pos = 0;
154 while batch_cursor.key_valid(batch_storage) || pending_pos < pending_keys.len() {
155
156 // Determine the next key we will work on; could be synthetic, could be from a batch.
157 let key1 = pending_keys.get(pending_pos);
158 let key2 = batch_cursor.get_key(batch_storage);
159 let key = match (key1, key2) {
160 (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2),
161 (Some(key1), None) => key1,
162 (None, Some(key2)) => key2,
163 (None, None) => unreachable!(),
164 };
165
166 // Populate `interesting_times` with interesting times not beyond `upper_limit`.
167 // TODO: This could just be `pending_time` and indexes within `lower .. upper`.
168 let prior_pos = pending_pos;
169 interesting_times.clear();
170 while pending_keys.get(pending_pos) == Some(key) {
171 let owned_time = Tr1::owned_time(pending_time.index(pending_pos));
172 if !upper_limit.less_equal(&owned_time) { interesting_times.push(owned_time); }
173 pending_pos += 1;
174 }
175
176 // tidy up times, removing redundancy.
177 sort_dedup(&mut interesting_times);
178
179 // If there are new updates, or pending times, we must investigate!
180 if batch_cursor.get_key(batch_storage) == Some(key) || !interesting_times.is_empty() {
181
182 // do the per-key computation.
183 thinker.compute(
184 key,
185 (&mut source_cursor, source_storage),
186 (&mut output_cursor, output_storage),
187 (&mut batch_cursor, batch_storage),
188 &interesting_times,
189 &mut logic,
190 &upper_limit,
191 &mut buffers[..],
192 &mut new_interesting_times,
193 );
194
195 // Advance the cursor if this key, so that the loop's validity check registers the work as done.
196 if batch_cursor.get_key(batch_storage) == Some(key) { batch_cursor.step_key(batch_storage); }
197
198 // Merge novel pending times with any prior pending times we did not process.
199 // TODO: This could be a merge, not a sort_dedup, because both lists should be sorted.
200 for pos in prior_pos .. pending_pos {
201 let owned_time = Tr1::owned_time(pending_time.index(pos));
202 if upper_limit.less_equal(&owned_time) { new_interesting_times.push(owned_time); }
203 }
204 sort_dedup(&mut new_interesting_times);
205 for time in new_interesting_times.drain(..) {
206 next_pending_keys.push_ref(key);
207 next_pending_time.push_own(&time);
208 }
209
210 // Sort each buffer by value and move into the corresponding builder.
211 // TODO: This makes assumptions about at least one of (i) the stability of `sort_by`,
212 // (ii) that the buffers are time-ordered, and (iii) that the builders accept
213 // arbitrarily ordered times.
214 for index in 0 .. buffers.len() {
215 buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0));
216 push(&mut buffer, key, &mut buffers[index].1);
217 buffers[index].1.clear();
218 builders[index].push(&mut buffer);
219
220 }
221 }
222 else {
223 // copy over the pending key and times.
224 for pos in prior_pos .. pending_pos {
225 next_pending_keys.push_ref(pending_keys.index(pos));
226 next_pending_time.push_ref(pending_time.index(pos));
227 }
228 }
229 }
230 // Drop to avoid lifetime issues that would lock `pending_{keys, time}`.
231 drop(thinker);
232
233 // We start sealing output batches from the lower limit (previous upper limit).
234 // In principle, we could update `lower_limit` itself, and it should arrive at
235 // `upper_limit` by the end of the process.
236 output_lower.clear();
237 output_lower.extend(lower_limit.borrow().iter().cloned());
238
239 // build and ship each batch (because only one capability per message).
240 for (index, builder) in builders.drain(..).enumerate() {
241
242 // Form the upper limit of the next batch, which includes all times greater
243 // than the input batch, or the capabilities from i + 1 onward.
244 output_upper.clear();
245 output_upper.extend(upper_limit.borrow().iter().cloned());
246 for capability in &capabilities[index + 1 ..] {
247 output_upper.insert_ref(capability.time());
248 }
249
250 if output_upper.borrow() != output_lower.borrow() {
251
252 let description = Description::new(output_lower.clone(), output_upper.clone(), Antichain::from_elem(Tr1::Time::minimum()));
253 let batch = builder.done(description);
254
255 // ship batch to the output, and commit to the output trace.
256 output.session(&capabilities[index]).give(batch.clone());
257 output_writer.insert(batch, Some(capabilities[index].time().clone()));
258
259 output_lower.clear();
260 output_lower.extend(output_upper.borrow().iter().cloned());
261 }
262 }
263 // This should be true, as the final iteration introduces no capabilities, and
264 // uses exactly `upper_limit` to determine the upper bound. Good to check though.
265 assert!(output_upper.borrow() == upper_limit.borrow());
266
267 // Refresh pending keys and times, then downgrade capabilities to the frontier of times.
268 pending_keys.clear(); std::mem::swap(&mut next_pending_keys, &mut pending_keys);
269 pending_time.clear(); std::mem::swap(&mut next_pending_time, &mut pending_time);
270
271 // Update `capabilities` to reflect pending times.
272 let mut frontier = Antichain::<Tr1::Time>::new();
273 let mut owned_time = Tr1::Time::minimum();
274 for pos in 0 .. pending_time.len() {
275 Tr1::clone_time_onto(pending_time.index(pos), &mut owned_time);
276 frontier.insert_ref(&owned_time);
277 }
278 capabilities.downgrade(frontier);
279 }
280
281 // ensure that observed progress is reflected in the output.
282 output_writer.seal(upper_limit.clone());
283
284 // We only anticipate future times in advance of `upper_limit`.
285 source_trace.set_logical_compaction(upper_limit.borrow());
286 output_reader.set_logical_compaction(upper_limit.borrow());
287
288 // We will only slice the data between future batches.
289 source_trace.set_physical_compaction(upper_limit.borrow());
290 output_reader.set_physical_compaction(upper_limit.borrow());
291 }
292
293 // Exert trace maintenance if we have been so requested.
294 output_writer.exert();
295 }
296 }
297 )
298 };
299
300 Arranged { stream, trace: result_trace.unwrap() }
301}
302
303
304#[inline(never)]
305fn sort_dedup<T: Ord>(list: &mut Vec<T>) {
306 list.dedup();
307 list.sort();
308 list.dedup();
309}
310
311/// Implementation based on replaying historical and new updates together.
312mod history_replay {
313
314 use timely::progress::Antichain;
315
316 use crate::lattice::Lattice;
317 use crate::trace::Cursor;
318 use crate::operators::ValueHistory;
319
320 use super::sort_dedup;
321
322 /// The `HistoryReplayer` is a compute strategy based on moving through existing inputs, interesting times, etc in
323 /// time order, maintaining consolidated representations of updates with respect to future interesting times.
324 pub struct HistoryReplayer<V1, V2, V, T, D1, D2> {
325 input_history: ValueHistory<V1, T, D1>,
326 output_history: ValueHistory<V2, T, D2>,
327 batch_history: ValueHistory<V1, T, D1>,
328 input_buffer: Vec<(V1, D1)>,
329 output_buffer: Vec<(V, D2)>,
330 update_buffer: Vec<(V, D2)>,
331 output_produced: Vec<((V, T), D2)>,
332 synth_times: Vec<T>,
333 meets: Vec<T>,
334 times_current: Vec<T>,
335 temporary: Vec<T>,
336 }
337
338 impl<V1, V2, V, T, D1, D2> HistoryReplayer<V1, V2, V, T, D1, D2>
339 where
340 V1: Copy + Ord,
341 V2: Copy + Ord,
342 V: Clone + Ord,
343 T: Ord + Clone + Lattice,
344 D1: Clone + crate::difference::Semigroup,
345 D2: Clone + crate::difference::Semigroup,
346 {
347 pub fn new() -> Self {
348 HistoryReplayer {
349 input_history: ValueHistory::new(),
350 output_history: ValueHistory::new(),
351 batch_history: ValueHistory::new(),
352 input_buffer: Vec::new(),
353 output_buffer: Vec::new(),
354 update_buffer: Vec::new(),
355 output_produced: Vec::new(),
356 synth_times: Vec::new(),
357 meets: Vec::new(),
358 times_current: Vec::new(),
359 temporary: Vec::new(),
360 }
361 }
362 #[inline(never)]
363 pub fn compute<'a, K, C1, C2, C3, L>(
364 &mut self,
365 key: K,
366 (source_cursor, source_storage): (&mut C1, &'a C1::Storage),
367 (output_cursor, output_storage): (&mut C2, &'a C2::Storage),
368 (batch_cursor, batch_storage): (&mut C3, &'a C3::Storage),
369 times: &Vec<T>,
370 logic: &mut L,
371 upper_limit: &Antichain<T>,
372 outputs: &mut [(T, Vec<(V, T, D2)>)],
373 new_interesting: &mut Vec<T>)
374 where
375 C1: Cursor<Key<'a> = K, Val<'a> = V1, Time = T, Diff = D1>,
376 C2: Cursor<Key<'a> = K, Val<'a> = V2, ValOwn = V, Time = T, Diff = D2>,
377 C3: Cursor<Key<'a> = K, Val<'a> = V1, Time = T, Diff = D1>,
378 K: Copy + Ord,
379 L: FnMut(K, &[(V1, D1)], &mut Vec<(V, D2)>, &mut Vec<(V, D2)>),
380 {
381
382 // The work we need to perform is at times defined principally by the contents of `batch_cursor`
383 // and `times`, respectively "new work we just received" and "old times we were warned about".
384 //
385 // Our first step is to identify these times, so that we can use them to restrict the amount of
386 // information we need to recover from `input` and `output`; as all times of interest will have
387 // some time from `batch_cursor` or `times`, we can compute their meet and advance all other
388 // loaded times by performing the lattice `join` with this value.
389
390 // Load the batch contents.
391 let mut batch_replay = self.batch_history.replay_key(batch_cursor, batch_storage, key, None);
392
393 // We determine the meet of times we must reconsider (those from `batch` and `times`). This meet
394 // can be used to advance other historical times, which may consolidate their representation. As
395 // a first step, we determine the meets of each *suffix* of `times`, which we will use as we play
396 // history forward.
397
398 self.meets.clear();
399 self.meets.extend(times.iter().cloned());
400 for index in (1 .. self.meets.len()).rev() {
401 self.meets[index-1] = self.meets[index-1].meet(&self.meets[index]);
402 }
403
404 // Determine the meet of times in `batch` and `times`.
405 let mut meet = None;
406 update_meet(&mut meet, self.meets.get(0));
407 update_meet(&mut meet, batch_replay.meet());
408
409 // Having determined the meet, we can load the input and output histories, where we
410 // advance all times by joining them with `meet`. The resulting times are more compact
411 // and guaranteed to accumulate identically for times greater or equal to `meet`.
412
413 // Load the input and output histories.
414 let mut input_replay =
415 self.input_history.replay_key(source_cursor, source_storage, key, meet.as_ref());
416 let mut output_replay =
417 self.output_history.replay_key(output_cursor, output_storage, key, meet.as_ref());
418
419 self.synth_times.clear();
420 self.times_current.clear();
421 self.output_produced.clear();
422
423 // The frontier of times we may still consider.
424 // Derived from frontiers of our update histories, supplied times, and synthetic times.
425
426 let mut times_slice = ×[..];
427 let mut meets_slice = &self.meets[..];
428
429 // We have candidate times from `batch` and `times`, as well as times identified by either
430 // `input` or `output`. Finally, we may have synthetic times produced as the join of times
431 // we consider in the course of evaluation. As long as any of these times exist, we need to
432 // keep examining times.
433 while let Some(next_time) = [ batch_replay.time(),
434 times_slice.first(),
435 input_replay.time(),
436 output_replay.time(),
437 self.synth_times.last(),
438 ].into_iter().flatten().min().cloned() {
439
440 // Advance input and output history replayers. This marks applicable updates as active.
441 input_replay.step_while_time_is(&next_time);
442 output_replay.step_while_time_is(&next_time);
443
444 // One of our goals is to determine if `next_time` is "interesting", meaning whether we
445 // have any evidence that we should re-evaluate the user logic at this time. For a time
446 // to be "interesting" it would need to be the join of times that include either a time
447 // from `batch`, `times`, or `synth`. Neither `input` nor `output` times are sufficient.
448
449 // Advance batch history, and capture whether an update exists at `next_time`.
450 let mut interesting = batch_replay.step_while_time_is(&next_time);
451 if interesting { if let Some(meet) = meet.as_ref() { batch_replay.advance_buffer_by(meet); } }
452
453 // advance both `synth_times` and `times_slice`, marking this time interesting if in either.
454 while self.synth_times.last() == Some(&next_time) {
455 // We don't know enough about `next_time` to avoid putting it in to `times_current`.
456 // TODO: If we knew that the time derived from a canceled batch update, we could remove the time.
457 self.times_current.push(self.synth_times.pop().expect("failed to pop from synth_times")); // <-- TODO: this could be a min-heap.
458 interesting = true;
459 }
460 while times_slice.first() == Some(&next_time) {
461 // We know nothing about why we were warned about `next_time`, and must include it to scare future times.
462 self.times_current.push(times_slice[0].clone());
463 times_slice = ×_slice[1..];
464 meets_slice = &meets_slice[1..];
465 interesting = true;
466 }
467
468 // Times could also be interesting if an interesting time is less than them, as they would join
469 // and become the time itself. They may not equal the current time because whatever frontier we
470 // are tracking may not have advanced far enough.
471 // TODO: `batch_history` may or may not be super compact at this point, and so this check might
472 // yield false positives if not sufficiently compact. Maybe we should look into this and see.
473 interesting = interesting || batch_replay.buffer().iter().any(|&((_, ref t),_)| t.less_equal(&next_time));
474 interesting = interesting || self.times_current.iter().any(|t| t.less_equal(&next_time));
475
476 // We should only process times that are not in advance of `upper_limit`.
477 //
478 // We have no particular guarantee that known times will not be in advance of `upper_limit`.
479 // We may have the guarantee that synthetic times will not be, as we test against the limit
480 // before we add the time to `synth_times`.
481 if !upper_limit.less_equal(&next_time) {
482
483 // We should re-evaluate the computation if this is an interesting time.
484 // If the time is uninteresting (and our logic is sound) it is not possible for there to be
485 // output produced. This sounds like a good test to have for debug builds!
486 if interesting {
487
488 // Assemble the input collection at `next_time`. (`self.input_buffer` cleared just after use).
489 debug_assert!(self.input_buffer.is_empty());
490 if let Some(meet) = meet.as_ref() { input_replay.advance_buffer_by(meet) };
491 for ((value, time), diff) in input_replay.buffer().iter() {
492 if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
493 else { self.temporary.push(next_time.join(time)); }
494 }
495 for ((value, time), diff) in batch_replay.buffer().iter() {
496 if time.less_equal(&next_time) { self.input_buffer.push((*value, diff.clone())); }
497 else { self.temporary.push(next_time.join(time)); }
498 }
499 crate::consolidation::consolidate(&mut self.input_buffer);
500
501 // Assemble the output collection at `next_time`. (`self.output_buffer` cleared just after use).
502 if let Some(meet) = meet.as_ref() { output_replay.advance_buffer_by(meet) };
503 for ((value, time), diff) in output_replay.buffer().iter() {
504 if time.less_equal(&next_time) { self.output_buffer.push((C2::owned_val(*value), diff.clone())); }
505 else { self.temporary.push(next_time.join(time)); }
506 }
507 for ((value, time), diff) in self.output_produced.iter() {
508 if time.less_equal(&next_time) { self.output_buffer.push(((*value).to_owned(), diff.clone())); }
509 else { self.temporary.push(next_time.join(time)); }
510 }
511 crate::consolidation::consolidate(&mut self.output_buffer);
512
513 // Apply user logic if non-empty input or output and see what happens!
514 if !self.input_buffer.is_empty() || !self.output_buffer.is_empty() {
515 logic(key, &self.input_buffer[..], &mut self.output_buffer, &mut self.update_buffer);
516 self.input_buffer.clear();
517 self.output_buffer.clear();
518
519 // Having subtracted output updates from user output, consolidate the results to determine
520 // if there is anything worth reporting. Note: this also orders the results by value, so
521 // that could make the above merging plan even easier.
522 //
523 // Stash produced updates into both capability-indexed buffers and `output_produced`.
524 // The two locations are important, in that we will compact `output_produced` as we move
525 // through times, but we cannot compact the output buffers because we need their actual
526 // times.
527 crate::consolidation::consolidate(&mut self.update_buffer);
528 if !self.update_buffer.is_empty() {
529
530 // We *should* be able to find a capability for `next_time`. Any thing else would
531 // indicate a logical error somewhere along the way; either we release a capability
532 // we should have kept, or we have computed the output incorrectly (or both!)
533 let idx = outputs.iter().rev().position(|(time, _)| time.less_equal(&next_time));
534 let idx = outputs.len() - idx.expect("failed to find index") - 1;
535 for (val, diff) in self.update_buffer.drain(..) {
536 self.output_produced.push(((val.clone(), next_time.clone()), diff.clone()));
537 outputs[idx].1.push((val, next_time.clone(), diff));
538 }
539
540 // Advance times in `self.output_produced` and consolidate the representation.
541 // NOTE: We only do this when we add records; it could be that there are situations
542 // where we want to consolidate even without changes (because an initially
543 // large collection can now be collapsed).
544 if let Some(meet) = meet.as_ref() { for entry in &mut self.output_produced { (entry.0).1.join_assign(meet); } }
545 crate::consolidation::consolidate(&mut self.output_produced);
546 }
547 }
548 }
549
550 // Determine synthetic interesting times.
551 //
552 // Synthetic interesting times are produced differently for interesting and uninteresting
553 // times. An uninteresting time must join with an interesting time to become interesting,
554 // which means joins with `self.batch_history` and `self.times_current`. I think we can
555 // skip `self.synth_times` as we haven't gotten to them yet, but we will and they will be
556 // joined against everything.
557
558 // Any time, even uninteresting times, must be joined with the current accumulation of
559 // batch times as well as the current accumulation of `times_current`.
560 self.temporary.extend(batch_replay.buffer().iter().map(|((_,time),_)| time).filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
561 self.temporary.extend(self.times_current.iter().filter(|time| !time.less_equal(&next_time)).map(|time| time.join(&next_time)));
562 sort_dedup(&mut self.temporary);
563
564 // Introduce synthetic times, and re-organize if we add any.
565 let synth_len = self.synth_times.len();
566 for time in self.temporary.drain(..) {
567 // We can either service `join` now, or must delay for the future.
568 if upper_limit.less_equal(&time) {
569 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&time)));
570 new_interesting.push(time);
571 }
572 else {
573 self.synth_times.push(time);
574 }
575 }
576 if self.synth_times.len() > synth_len {
577 self.synth_times.sort_by(|x,y| y.cmp(x));
578 self.synth_times.dedup();
579 }
580 }
581 else if interesting {
582 // We cannot process `next_time` now, and must delay it.
583 //
584 // I think we are probably only here because of an uninteresting time declared interesting,
585 // as initial interesting times are filtered to be in interval, and synthetic times are also
586 // filtered before introducing them to `self.synth_times`.
587 new_interesting.push(next_time.clone());
588 debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time)))
589 }
590
591 // Update `meet` to track the meet of each source of times.
592 meet = None;
593 update_meet(&mut meet, batch_replay.meet());
594 update_meet(&mut meet, input_replay.meet());
595 update_meet(&mut meet, output_replay.meet());
596 for time in self.synth_times.iter() { update_meet(&mut meet, Some(time)); }
597 update_meet(&mut meet, meets_slice.first());
598
599 // Update `times_current` by the frontier.
600 if let Some(meet) = meet.as_ref() {
601 for time in self.times_current.iter_mut() {
602 *time = time.join(meet);
603 }
604 }
605
606 sort_dedup(&mut self.times_current);
607 }
608
609 // Normalize the representation of `new_interesting`, deduplicating and ordering.
610 sort_dedup(new_interesting);
611 }
612 }
613
614 /// Updates an optional meet by an optional time.
615 fn update_meet<T: Lattice+Clone>(meet: &mut Option<T>, other: Option<&T>) {
616 if let Some(time) = other {
617 if let Some(meet) = meet.as_mut() { meet.meet_assign(time); }
618 else { *meet = Some(time.clone()); }
619 }
620 }
621}