Skip to main content

palimpsest_dataflow/trace/implementations/
spine_fueled.rs

1//! An append-only collection of update batches.
2//!
3//! The `Spine` is a general-purpose trace implementation based on collection and merging
4//! immutable batches of updates. It is generic with respect to the batch type, and can be
5//! instantiated for any implementor of `trace::Batch`.
6//!
7//! ## Design
8//!
9//! This spine is represented as a list of layers, where each element in the list is either
10//!
11//!   1. MergeState::Vacant  empty
12//!   2. MergeState::Single  a single batch
13//!   3. MergeState::Double  a pair of batches
14//!
15//! Each "batch" has the option to be `None`, indicating a non-batch that nonetheless acts
16//! as a number of updates proportionate to the level at which it exists (for bookkeeping).
17//!
18//! Each of the batches at layer i contains at most 2^i elements. The sequence of batches
19//! should have the upper bound of one match the lower bound of the next. Batches may be
20//! logically empty, with matching upper and lower bounds, as a bookkeeping mechanism.
21//!
22//! Each batch at layer i is treated as if it contains exactly 2^i elements, even though it
23//! may actually contain fewer elements. This allows us to decouple the physical representation
24//! from logical amounts of effort invested in each batch. It allows us to begin compaction and
25//! to reduce the number of updates, without compromising our ability to continue to move
26//! updates along the spine. We are explicitly making the trade-off that while some batches
27//! might compact at lower levels, we want to treat them as if they contained their full set of
28//! updates for accounting reasons (to apply work to higher levels).
29//!
30//! We maintain the invariant that for any in-progress merge at level k there should be fewer
31//! than 2^k records at levels lower than k. That is, even if we were to apply an unbounded
32//! amount of effort to those records, we would not have enough records to prompt a merge into
33//! the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress
34//! merge at level k, the remaining effort required (number of records minus applied effort) is
35//! less than the number of records that would need to be added to reach 2^k records in layers
36//! below.
37//!
38//! ## Mathematics
39//!
40//! When a merge is initiated, there should be a non-negative *deficit* of updates before the layers
41//! below could plausibly produce a new batch for the currently merging layer. We must determine a
42//! factor of proportionality, so that newly arrived updates provide at least that amount of "fuel"
43//! towards the merging layer, so that the merge completes before lower levels invade.
44//!
45//! ### Deficit:
46//!
47//! A new merge is initiated only in response to the completion of a prior merge, or the introduction
48//! of new records from outside. The latter case is special, and will maintain our invariant trivially,
49//! so we will focus on the former case.
50//!
51//! When a merge at level k completes, assuming we have maintained our invariant then there should be
52//! fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to
53//! 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are
54//! added. This means that a factor of proportionality of four should be sufficient to ensure that
55//! the merge completes before a new merge is initiated.
56//!
57//! When new records get introduced, we will need to roll up any batches at lower levels, which we
58//! treat as the introduction of records. Each of these virtual records introduced should either be
59//! accounted for the fuel it should contribute, as it results in the promotion of batches closer to
60//! in-progress merges.
61//!
62//! ### Fuel sharing
63//!
64//! We like the idea of applying fuel preferentially to merges at *lower* levels, under the idea that
65//! they are easier to complete, and we benefit from fewer total merges in progress. This does delay
66//! the completion of merges at higher levels, and may not obviously be a total win. If we choose to
67//! do this, we should make sure that we correctly account for completed merges at low layers: they
68//! should still extract fuel from new updates even though they have completed, at least until they
69//! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive.
70
71use crate::logging::Logger;
72use crate::trace::cursor::CursorList;
73use crate::trace::Merger;
74use crate::trace::{Batch, BatchReader, ExertionLogic, Trace, TraceReader};
75
76use ::timely::dataflow::operators::generic::OperatorInfo;
77use ::timely::order::PartialOrder;
78use ::timely::progress::{frontier::AntichainRef, Antichain};
79
80/// An append-only collection of update tuples.
81///
82/// A spine maintains a small number of immutable collections of update tuples, merging the collections when
83/// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with
84/// other immutable collections.
85pub struct Spine<B: Batch> {
86    operator: OperatorInfo,
87    logger: Option<Logger>,
88    logical_frontier: Antichain<B::Time>, // Times after which the trace must accumulate correctly.
89    physical_frontier: Antichain<B::Time>, // Times after which the trace must be able to subset its inputs.
90    merging: Vec<MergeState<B>>,           // Several possibly shared collections of updates.
91    pending: Vec<B>,                       // Batches at times in advance of `frontier`.
92    upper: Antichain<B::Time>,
93    effort: usize,
94    activator: Option<timely::scheduling::activate::Activator>,
95    /// Parameters to `exert_logic`, containing tuples of `(index, count, length)`.
96    exert_logic_param: Vec<(usize, usize, usize)>,
97    /// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
98    exert_logic: Option<ExertionLogic>,
99}
100
101use crate::trace::WithLayout;
102impl<B: Batch> WithLayout for Spine<B> {
103    type Layout = B::Layout;
104}
105
106impl<B: Batch + Clone + 'static> TraceReader for Spine<B> {
107    type Batch = B;
108    type Storage = Vec<B>;
109    type Cursor = CursorList<<B as BatchReader>::Cursor>;
110
111    fn cursor_through(
112        &mut self,
113        upper: AntichainRef<Self::Time>,
114    ) -> Option<(Self::Cursor, Self::Storage)> {
115        // If `upper` is the minimum frontier, we can return an empty cursor.
116        // This can happen with operators that are written to expect the ability to acquire cursors
117        // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly.
118        if upper.less_equal(&<Self::Time as timely::progress::Timestamp>::minimum()) {
119            let cursors = Vec::new();
120            let storage = Vec::new();
121            return Some((CursorList::new(cursors, &storage), storage));
122        }
123
124        // The supplied `upper` should have the property that for each of our
125        // batch `lower` and `upper` frontiers, the supplied upper is comparable
126        // to the frontier; it should not be incomparable, because the frontiers
127        // that we created form a total order. If it is, there is a bug.
128        //
129        // We should acquire a cursor including all batches whose upper is less
130        // or equal to the supplied upper, excluding all batches whose lower is
131        // greater or equal to the supplied upper, and if a batch straddles the
132        // supplied upper it had better be empty.
133
134        // We shouldn't grab a cursor into a closed trace, right?
135        assert!(self.logical_frontier.borrow().len() > 0);
136
137        // Check that `upper` is greater or equal to `self.physical_frontier`.
138        // Otherwise, the cut could be in `self.merging` and it is user error anyhow.
139        // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1))));
140        assert!(PartialOrder::less_equal(
141            &self.physical_frontier.borrow(),
142            &upper
143        ));
144
145        let mut cursors = Vec::new();
146        let mut storage = Vec::new();
147
148        for merge_state in self.merging.iter().rev() {
149            match merge_state {
150                MergeState::Double(variant) => match variant {
151                    MergeVariant::InProgress(batch1, batch2, _) => {
152                        if !batch1.is_empty() {
153                            cursors.push(batch1.cursor());
154                            storage.push(batch1.clone());
155                        }
156                        if !batch2.is_empty() {
157                            cursors.push(batch2.cursor());
158                            storage.push(batch2.clone());
159                        }
160                    }
161                    MergeVariant::Complete(Some((batch, _))) => {
162                        if !batch.is_empty() {
163                            cursors.push(batch.cursor());
164                            storage.push(batch.clone());
165                        }
166                    }
167                    MergeVariant::Complete(None) => {}
168                },
169                MergeState::Single(Some(batch)) => {
170                    if !batch.is_empty() {
171                        cursors.push(batch.cursor());
172                        storage.push(batch.clone());
173                    }
174                }
175                MergeState::Single(None) => {}
176                MergeState::Vacant => {}
177            }
178        }
179
180        for batch in self.pending.iter() {
181            if !batch.is_empty() {
182                // For a non-empty `batch`, it is a catastrophic error if `upper`
183                // requires some-but-not-all of the updates in the batch. We can
184                // determine this from `upper` and the lower and upper bounds of
185                // the batch itself.
186                //
187                // TODO: It is not clear if this is the 100% correct logic, due
188                // to the possible non-total-orderedness of the frontiers.
189
190                let include_lower = PartialOrder::less_equal(&batch.lower().borrow(), &upper);
191                let include_upper = PartialOrder::less_equal(&batch.upper().borrow(), &upper);
192
193                if include_lower != include_upper && upper != batch.lower().borrow() {
194                    panic!("`cursor_through`: `upper` straddles batch");
195                }
196
197                // include pending batches
198                if include_upper {
199                    cursors.push(batch.cursor());
200                    storage.push(batch.clone());
201                }
202            }
203        }
204
205        Some((CursorList::new(cursors, &storage), storage))
206    }
207    #[inline]
208    fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
209        self.logical_frontier.clear();
210        self.logical_frontier.extend(frontier.iter().cloned());
211    }
212    #[inline]
213    fn get_logical_compaction(&mut self) -> AntichainRef<'_, B::Time> {
214        self.logical_frontier.borrow()
215    }
216    #[inline]
217    fn set_physical_compaction(&mut self, frontier: AntichainRef<'_, B::Time>) {
218        // We should never request to rewind the frontier.
219        debug_assert!(
220            PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier),
221            "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n",
222            self.physical_frontier,
223            frontier
224        );
225        self.physical_frontier.clear();
226        self.physical_frontier.extend(frontier.iter().cloned());
227        self.consider_merges();
228    }
229    #[inline]
230    fn get_physical_compaction(&mut self) -> AntichainRef<'_, B::Time> {
231        self.physical_frontier.borrow()
232    }
233
234    #[inline]
235    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
236        for batch in self.merging.iter().rev() {
237            match batch {
238                MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => {
239                    f(batch1);
240                    f(batch2);
241                }
242                MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => f(batch),
243                MergeState::Single(Some(batch)) => f(batch),
244                _ => {}
245            }
246        }
247        for batch in self.pending.iter() {
248            f(batch);
249        }
250    }
251}
252
253// A trace implementation for any key type that can be borrowed from or converted into `Key`.
254// TODO: Almost all this implementation seems to be generic with respect to the trace and batch types.
255impl<B: Batch + Clone + 'static> Trace for Spine<B> {
256    fn new(
257        info: ::timely::dataflow::operators::generic::OperatorInfo,
258        logging: Option<crate::logging::Logger>,
259        activator: Option<timely::scheduling::activate::Activator>,
260    ) -> Self {
261        Self::with_effort(1, info, logging, activator)
262    }
263
264    /// Apply some amount of effort to trace maintenance.
265    ///
266    /// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set.
267    fn exert(&mut self) {
268        // If there is work to be done, ...
269        self.tidy_layers();
270        // Determine whether we should apply effort independent of updates.
271        if let Some(effort) = self.exert_effort() {
272            // If any merges exist, we can directly call `apply_fuel`.
273            if self.merging.iter().any(|b| b.is_double()) {
274                self.apply_fuel(&mut (effort as isize));
275            }
276            // Otherwise, we'll need to introduce fake updates to move merges along.
277            else {
278                // Introduce an empty batch with roughly *effort number of virtual updates.
279                let level = effort.next_power_of_two().trailing_zeros() as usize;
280                self.introduce_batch(None, level);
281            }
282            // We were not in reduced form, so let's check again in the future.
283            if let Some(activator) = &self.activator {
284                activator.activate();
285            }
286        }
287    }
288
289    fn set_exert_logic(&mut self, logic: ExertionLogic) {
290        self.exert_logic = Some(logic);
291    }
292
293    // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
294    // merging the batch. This means it is a good time to perform amortized work proportional
295    // to the size of batch.
296    fn insert(&mut self, batch: Self::Batch) {
297        // Log the introduction of a batch.
298        self.logger.as_ref().map(|l| {
299            l.log(crate::logging::BatchEvent {
300                operator: self.operator.global_id,
301                length: batch.len(),
302            })
303        });
304
305        assert!(batch.lower() != batch.upper());
306        assert_eq!(batch.lower(), &self.upper);
307
308        self.upper.clone_from(batch.upper());
309
310        // TODO: Consolidate or discard empty batches.
311        self.pending.push(batch);
312        self.consider_merges();
313    }
314
315    /// Completes the trace with a final empty batch.
316    fn close(&mut self) {
317        if !self.upper.borrow().is_empty() {
318            self.insert(B::empty(self.upper.clone(), Antichain::new()));
319        }
320    }
321}
322
323// Drop implementation allows us to log batch drops, to zero out maintained totals.
324impl<B: Batch> Drop for Spine<B> {
325    fn drop(&mut self) {
326        self.drop_batches();
327    }
328}
329
330impl<B: Batch> Spine<B> {
331    /// Drops and logs batches. Used in `set_logical_compaction` and drop.
332    fn drop_batches(&mut self) {
333        if let Some(logger) = &self.logger {
334            for batch in self.merging.drain(..) {
335                match batch {
336                    MergeState::Single(Some(batch)) => {
337                        logger.log(crate::logging::DropEvent {
338                            operator: self.operator.global_id,
339                            length: batch.len(),
340                        });
341                    }
342                    MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => {
343                        logger.log(crate::logging::DropEvent {
344                            operator: self.operator.global_id,
345                            length: batch1.len(),
346                        });
347                        logger.log(crate::logging::DropEvent {
348                            operator: self.operator.global_id,
349                            length: batch2.len(),
350                        });
351                    }
352                    MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => {
353                        logger.log(crate::logging::DropEvent {
354                            operator: self.operator.global_id,
355                            length: batch.len(),
356                        });
357                    }
358                    _ => {}
359                }
360            }
361            for batch in self.pending.drain(..) {
362                logger.log(crate::logging::DropEvent {
363                    operator: self.operator.global_id,
364                    length: batch.len(),
365                });
366            }
367        }
368    }
369}
370
371impl<B: Batch> Spine<B> {
372    /// Determine the amount of effort we should exert in the absence of updates.
373    ///
374    /// This method prepares an iterator over batches, including the level, count, and length of each layer.
375    /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
376    fn exert_effort(&mut self) -> Option<usize> {
377        self.exert_logic.as_ref().and_then(|exert_logic| {
378            self.exert_logic_param.clear();
379            self.exert_logic_param
380                .extend(
381                    self.merging
382                        .iter()
383                        .enumerate()
384                        .rev()
385                        .map(|(index, batch)| match batch {
386                            MergeState::Vacant => (index, 0, 0),
387                            MergeState::Single(_) => (index, 1, batch.len()),
388                            MergeState::Double(_) => (index, 2, batch.len()),
389                        }),
390                );
391
392            (exert_logic)(&self.exert_logic_param[..])
393        })
394    }
395
396    /// Describes the merge progress of layers in the trace.
397    ///
398    /// Intended for diagnostics rather than public consumption.
399    #[allow(dead_code)]
400    fn describe(&self) -> Vec<(usize, usize)> {
401        self.merging
402            .iter()
403            .map(|b| match b {
404                MergeState::Vacant => (0, 0),
405                x @ MergeState::Single(_) => (1, x.len()),
406                x @ MergeState::Double(_) => (2, x.len()),
407            })
408            .collect()
409    }
410
411    /// Allocates a fueled `Spine` with a specified effort multiplier.
412    ///
413    /// This trace will merge batches progressively, with each inserted batch applying a multiple
414    /// of the batch's length in effort to each merge. The `effort` parameter is that multiplier.
415    /// This value should be at least one for the merging to happen; a value of zero is not helpful.
416    pub fn with_effort(
417        mut effort: usize,
418        operator: OperatorInfo,
419        logger: Option<crate::logging::Logger>,
420        activator: Option<timely::scheduling::activate::Activator>,
421    ) -> Self {
422        // Zero effort is .. not smart.
423        if effort == 0 {
424            effort = 1;
425        }
426
427        Spine {
428            operator,
429            logger,
430            logical_frontier: Antichain::from_elem(
431                <B::Time as timely::progress::Timestamp>::minimum(),
432            ),
433            physical_frontier: Antichain::from_elem(
434                <B::Time as timely::progress::Timestamp>::minimum(),
435            ),
436            merging: Vec::new(),
437            pending: Vec::new(),
438            upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
439            effort,
440            activator,
441            exert_logic_param: Vec::default(),
442            exert_logic: None,
443        }
444    }
445
446    /// Migrate data from `self.pending` into `self.merging`.
447    ///
448    /// This method reflects on the bookmarks held by others that may prevent merging, and in the
449    /// case that new batches can be introduced to the pile of mergeable batches, it gets on that.
450    #[inline(never)]
451    fn consider_merges(&mut self) {
452        // TODO: Consider merging pending batches before introducing them.
453        // TODO: We could use a `VecDeque` here to draw from the front and append to the back.
454        while !self.pending.is_empty()
455            && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier)
456        //   self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1)))
457        {
458            // Batch can be taken in optimized insertion.
459            // Otherwise it is inserted normally at the end of the method.
460            let mut batch = Some(self.pending.remove(0));
461
462            // If `batch` and the most recently inserted batch are both empty, we can just fuse them.
463            // We can also replace a structurally empty batch with this empty batch, preserving the
464            // apparent record count but now with non-trivial lower and upper bounds.
465            if batch.as_ref().unwrap().len() == 0 {
466                if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
467                    if self.merging[position].is_single() && self.merging[position].len() == 0 {
468                        self.insert_at(batch.take(), position);
469                        let merged = self.complete_at(position);
470                        self.merging[position] = MergeState::Single(merged);
471                    }
472                }
473            }
474
475            // Normal insertion for the batch.
476            if let Some(batch) = batch {
477                let index = batch.len().next_power_of_two();
478                self.introduce_batch(Some(batch), index.trailing_zeros() as usize);
479            }
480        }
481
482        // Having performed all of our work, if we should perform more work reschedule ourselves.
483        if self.exert_effort().is_some() {
484            if let Some(activator) = &self.activator {
485                activator.activate();
486            }
487        }
488    }
489
490    /// Introduces a batch at an indicated level.
491    ///
492    /// The level indication is often related to the size of the batch, but
493    /// it can also be used to artificially fuel the computation by supplying
494    /// empty batches at non-trivial indices, to move merges along.
495    pub fn introduce_batch(&mut self, batch: Option<B>, batch_index: usize) {
496        // Step 0.  Determine an amount of fuel to use for the computation.
497        //
498        //          Fuel is used to drive maintenance of the data structure,
499        //          and in particular are used to make progress through merges
500        //          that are in progress. The amount of fuel to use should be
501        //          proportional to the number of records introduced, so that
502        //          we are guaranteed to complete all merges before they are
503        //          required as arguments to merges again.
504        //
505        //          The fuel use policy is negotiable, in that we might aim
506        //          to use relatively less when we can, so that we return
507        //          control promptly, or we might account more work to larger
508        //          batches. Not clear to me which are best, of if there
509        //          should be a configuration knob controlling this.
510
511        // The amount of fuel to use is proportional to 2^batch_index, scaled
512        // by a factor of self.effort which determines how eager we are in
513        // performing maintenance work. We need to ensure that each merge in
514        // progress receives fuel for each introduced batch, and so multiply
515        // by that as well.
516        if batch_index > 32 {
517            println!("Large batch index: {}", batch_index);
518        }
519
520        // We believe that eight units of fuel is sufficient for each introduced
521        // record, accounted as four for each record, and a potential four more
522        // for each virtual record associated with promoting existing smaller
523        // batches. We could try and make this be less, or be scaled to merges
524        // based on their deficit at time of instantiation. For now, we remain
525        // conservative.
526        let mut fuel = 8 << batch_index;
527        // Scale up by the effort parameter, which is calibrated to one as the
528        // minimum amount of effort.
529        fuel *= self.effort;
530        // Convert to an `isize` so we can observe any fuel shortfall.
531        let mut fuel = fuel as isize;
532
533        // Step 1.  Apply fuel to each in-progress merge.
534        //
535        //          Before we can introduce new updates, we must apply any
536        //          fuel to in-progress merges, as this fuel is what ensures
537        //          that the merges will be complete by the time we insert
538        //          the updates.
539        self.apply_fuel(&mut fuel);
540
541        // Step 2.  We must ensure the invariant that adjacent layers do not
542        //          contain two batches will be satisfied when we insert the
543        //          batch. We forcibly completing all merges at layers lower
544        //          than and including `batch_index`, so that the new batch
545        //          is inserted into an empty layer.
546        //
547        //          We could relax this to "strictly less than `batch_index`"
548        //          if the layer above has only a single batch in it, which
549        //          seems not implausible if it has been the focus of effort.
550        //
551        //          This should be interpreted as the introduction of some
552        //          volume of fake updates, and we will need to fuel merges
553        //          by a proportional amount to ensure that they are not
554        //          surprised later on. The number of fake updates should
555        //          correspond to the deficit for the layer, which perhaps
556        //          we should track explicitly.
557        self.roll_up(batch_index);
558
559        // Step 3. This insertion should be into an empty layer. It is a
560        //         logical error otherwise, as we may be violating our
561        //         invariant, from which all wonderment derives.
562        self.insert_at(batch, batch_index);
563
564        // Step 4. Tidy the largest layers.
565        //
566        //         It is important that we not tidy only smaller layers,
567        //         as their ascension is what ensures the merging and
568        //         eventual compaction of the largest layers.
569        self.tidy_layers();
570    }
571
572    /// Ensures that an insertion at layer `index` will succeed.
573    ///
574    /// This method is subject to the constraint that all existing batches
575    /// should occur at higher levels, which requires it to "roll up" batches
576    /// present at lower levels before the method is called. In doing this,
577    /// we should not introduce more virtual records than 2^index, as that
578    /// is the amount of excess fuel we have budgeted for completing merges.
579    fn roll_up(&mut self, index: usize) {
580        // Ensure entries sufficient for `index`.
581        while self.merging.len() <= index {
582            self.merging.push(MergeState::Vacant);
583        }
584
585        // We only need to roll up if there are non-vacant layers.
586        if self.merging[..index].iter().any(|m| !m.is_vacant()) {
587            // Collect and merge all batches at layers up to but not including `index`.
588            let mut merged = None;
589            for i in 0..index {
590                self.insert_at(merged, i);
591                merged = self.complete_at(i);
592            }
593
594            // The merged results should be introduced at level `index`, which should
595            // be ready to absorb them (possibly creating a new merge at the time).
596            self.insert_at(merged, index);
597
598            // If the insertion results in a merge, we should complete it to ensure
599            // the upcoming insertion at `index` does not panic.
600            if self.merging[index].is_double() {
601                let merged = self.complete_at(index);
602                self.insert_at(merged, index + 1);
603            }
604        }
605    }
606
607    /// Applies an amount of fuel to merges in progress.
608    ///
609    /// The supplied `fuel` is for each in progress merge, and if we want to spend
610    /// the fuel non-uniformly (e.g. prioritizing merges at low layers) we could do
611    /// so in order to maintain fewer batches on average (at the risk of completing
612    /// merges of large batches later, but tbh probably not much later).
613    pub fn apply_fuel(&mut self, fuel: &mut isize) {
614        // For the moment our strategy is to apply fuel independently to each merge
615        // in progress, rather than prioritizing small merges. This sounds like a
616        // great idea, but we need better accounting in place to ensure that merges
617        // that borrow against later layers but then complete still "acquire" fuel
618        // to pay back their debts.
619        for index in 0..self.merging.len() {
620            // Give each level independent fuel, for now.
621            let mut fuel = *fuel;
622            // Pass along various logging stuffs, in case we need to report success.
623            self.merging[index].work(&mut fuel);
624            // `fuel` could have a deficit at this point, meaning we over-spent when
625            // we took a merge step. We could ignore this, or maintain the deficit
626            // and account future fuel against it before spending again. It isn't
627            // clear why that would be especially helpful to do; we might want to
628            // avoid overspends at multiple layers in the same invocation (to limit
629            // latencies), but there is probably a rich policy space here.
630
631            // If a merge completes, we can immediately merge it in to the next
632            // level, which is "guaranteed" to be complete at this point, by our
633            // fueling discipline.
634            if self.merging[index].is_complete() {
635                let complete = self.complete_at(index);
636                self.insert_at(complete, index + 1);
637            }
638        }
639    }
640
641    /// Inserts a batch at a specific location.
642    ///
643    /// This is a non-public internal method that can panic if we try and insert into a
644    /// layer which already contains two batches (and is still in the process of merging).
645    fn insert_at(&mut self, batch: Option<B>, index: usize) {
646        // Ensure the spine is large enough.
647        while self.merging.len() <= index {
648            self.merging.push(MergeState::Vacant);
649        }
650
651        // Insert the batch at the location.
652        match self.merging[index].take() {
653            MergeState::Vacant => {
654                self.merging[index] = MergeState::Single(batch);
655            }
656            MergeState::Single(old) => {
657                // Log the initiation of a merge.
658                self.logger.as_ref().map(|l| {
659                    l.log(crate::logging::MergeEvent {
660                        operator: self.operator.global_id,
661                        scale: index,
662                        length1: old.as_ref().map(|b| b.len()).unwrap_or(0),
663                        length2: batch.as_ref().map(|b| b.len()).unwrap_or(0),
664                        complete: None,
665                    })
666                });
667                let compaction_frontier = self.logical_frontier.borrow();
668                self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier);
669            }
670            MergeState::Double(_) => {
671                panic!("Attempted to insert batch into incomplete merge!")
672            }
673        };
674    }
675
676    /// Completes and extracts what ever is at layer `index`.
677    fn complete_at(&mut self, index: usize) -> Option<B> {
678        if let Some((merged, inputs)) = self.merging[index].complete() {
679            if let Some((input1, input2)) = inputs {
680                // Log the completion of a merge from existing parts.
681                self.logger.as_ref().map(|l| {
682                    l.log(crate::logging::MergeEvent {
683                        operator: self.operator.global_id,
684                        scale: index,
685                        length1: input1.len(),
686                        length2: input2.len(),
687                        complete: Some(merged.len()),
688                    })
689                });
690            }
691            Some(merged)
692        } else {
693            None
694        }
695    }
696
697    /// Attempts to draw down large layers to size appropriate layers.
698    fn tidy_layers(&mut self) {
699        // If the largest layer is complete (not merging), we can attempt
700        // to draw it down to the next layer. This is permitted if we can
701        // maintain our invariant that below each merge there are at most
702        // half the records that would be required to invade the merge.
703        if !self.merging.is_empty() {
704            let mut length = self.merging.len();
705            if self.merging[length - 1].is_single() {
706                // To move a batch down, we require that it contain few
707                // enough records that the lower level is appropriate,
708                // and that moving the batch would not create a merge
709                // violating our invariant.
710
711                let appropriate_level = self.merging[length - 1]
712                    .len()
713                    .next_power_of_two()
714                    .trailing_zeros() as usize;
715
716                // Continue only as far as is appropriate
717                while appropriate_level < length - 1 {
718                    match self.merging[length - 2].take() {
719                        // Vacant or structurally empty batches can be absorbed.
720                        MergeState::Vacant | MergeState::Single(None) => {
721                            self.merging.remove(length - 2);
722                            length = self.merging.len();
723                        }
724                        // Single batches may initiate a merge, if sizes are
725                        // within bounds, but terminate the loop either way.
726                        MergeState::Single(Some(batch)) => {
727                            // Determine the number of records that might lead
728                            // to a merge. Importantly, this is not the number
729                            // of actual records, but the sum of upper bounds
730                            // based on indices.
731                            let mut smaller = 0;
732                            for (index, batch) in self.merging[..(length - 2)].iter().enumerate() {
733                                match batch {
734                                    MergeState::Vacant => {}
735                                    MergeState::Single(_) => {
736                                        smaller += 1 << index;
737                                    }
738                                    MergeState::Double(_) => {
739                                        smaller += 2 << index;
740                                    }
741                                }
742                            }
743
744                            if smaller <= (1 << length) / 8 {
745                                self.merging.remove(length - 2);
746                                self.insert_at(Some(batch), length - 2);
747                            } else {
748                                self.merging[length - 2] = MergeState::Single(Some(batch));
749                            }
750                            return;
751                        }
752                        // If a merge is in progress there is nothing to do.
753                        MergeState::Double(state) => {
754                            self.merging[length - 2] = MergeState::Double(state);
755                            return;
756                        }
757                    }
758                }
759            }
760        }
761    }
762}
763
764/// Describes the state of a layer.
765///
766/// A layer can be empty, contain a single batch, or contain a pair of batches
767/// that are in the process of merging into a batch for the next layer.
768enum MergeState<B: Batch> {
769    /// An empty layer, containing no updates.
770    Vacant,
771    /// A layer containing a single batch.
772    ///
773    /// The `None` variant is used to represent a structurally empty batch present
774    /// to ensure the progress of maintenance work.
775    Single(Option<B>),
776    /// A layer containing two batches, in the process of merging.
777    Double(MergeVariant<B>),
778}
779
780impl<B: Batch<Time: Eq>> MergeState<B> {
781    /// The number of actual updates contained in the level.
782    fn len(&self) -> usize {
783        match self {
784            MergeState::Single(Some(b)) => b.len(),
785            MergeState::Double(MergeVariant::InProgress(b1, b2, _)) => b1.len() + b2.len(),
786            MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(),
787            _ => 0,
788        }
789    }
790
791    /// True only for the MergeState::Vacant variant.
792    fn is_vacant(&self) -> bool {
793        if let MergeState::Vacant = self {
794            true
795        } else {
796            false
797        }
798    }
799
800    /// True only for the MergeState::Single variant.
801    fn is_single(&self) -> bool {
802        if let MergeState::Single(_) = self {
803            true
804        } else {
805            false
806        }
807    }
808
809    /// True only for the MergeState::Double variant.
810    fn is_double(&self) -> bool {
811        if let MergeState::Double(_) = self {
812            true
813        } else {
814            false
815        }
816    }
817
818    /// Immediately complete any merge.
819    ///
820    /// The result is either a batch, if there is a non-trivial batch to return
821    /// or `None` if there is no meaningful batch to return. This does not distinguish
822    /// between Vacant entries and structurally empty batches, which should be done
823    /// with the `is_complete()` method.
824    ///
825    /// There is the additional option of input batches.
826    fn complete(&mut self) -> Option<(B, Option<(B, B)>)> {
827        match std::mem::replace(self, MergeState::Vacant) {
828            MergeState::Vacant => None,
829            MergeState::Single(batch) => batch.map(|b| (b, None)),
830            MergeState::Double(variant) => variant.complete(),
831        }
832    }
833
834    /// True iff the layer is a complete merge, ready for extraction.
835    fn is_complete(&mut self) -> bool {
836        if let MergeState::Double(MergeVariant::Complete(_)) = self {
837            true
838        } else {
839            false
840        }
841    }
842
843    /// Performs a bounded amount of work towards a merge.
844    ///
845    /// If the merge completes, the resulting batch is returned.
846    /// If a batch is returned, it is the obligation of the caller
847    /// to correctly install the result.
848    fn work(&mut self, fuel: &mut isize) {
849        // We only perform work for merges in progress.
850        if let MergeState::Double(layer) = self {
851            layer.work(fuel)
852        }
853    }
854
855    /// Extract the merge state, typically temporarily.
856    fn take(&mut self) -> Self {
857        std::mem::replace(self, MergeState::Vacant)
858    }
859
860    /// Initiates the merge of an "old" batch with a "new" batch.
861    ///
862    /// The upper frontier of the old batch should match the lower
863    /// frontier of the new batch, with the resulting batch describing
864    /// their composed interval, from the lower frontier of the old
865    /// batch to the upper frontier of the new batch.
866    ///
867    /// Either batch may be `None` which corresponds to a structurally
868    /// empty batch whose upper and lower froniers are equal. This
869    /// option exists purely for bookkeeping purposes, and no computation
870    /// is performed to merge the two batches.
871    fn begin_merge(
872        batch1: Option<B>,
873        batch2: Option<B>,
874        compaction_frontier: AntichainRef<B::Time>,
875    ) -> MergeState<B> {
876        let variant = match (batch1, batch2) {
877            (Some(batch1), Some(batch2)) => {
878                assert!(batch1.upper() == batch2.lower());
879                let begin_merge = <B as Batch>::begin_merge(&batch1, &batch2, compaction_frontier);
880                MergeVariant::InProgress(batch1, batch2, begin_merge)
881            }
882            (None, Some(x)) => MergeVariant::Complete(Some((x, None))),
883            (Some(x), None) => MergeVariant::Complete(Some((x, None))),
884            (None, None) => MergeVariant::Complete(None),
885        };
886
887        MergeState::Double(variant)
888    }
889}
890
891enum MergeVariant<B: Batch> {
892    /// Describes an actual in-progress merge between two non-trivial batches.
893    InProgress(B, B, <B as Batch>::Merger),
894    /// A merge that requires no further work. May or may not represent a non-trivial batch.
895    Complete(Option<(B, Option<(B, B)>)>),
896}
897
898impl<B: Batch> MergeVariant<B> {
899    /// Completes and extracts the batch, unless structurally empty.
900    ///
901    /// The result is either `None`, for structurally empty batches,
902    /// or a batch and optionally input batches from which it derived.
903    fn complete(mut self) -> Option<(B, Option<(B, B)>)> {
904        let mut fuel = isize::max_value();
905        self.work(&mut fuel);
906        if let MergeVariant::Complete(batch) = self {
907            batch
908        } else {
909            panic!("Failed to complete a merge!");
910        }
911    }
912
913    /// Applies some amount of work, potentially completing the merge.
914    ///
915    /// In case the work completes, the source batches are returned.
916    /// This allows the caller to manage the released resources.
917    fn work(&mut self, fuel: &mut isize) {
918        let variant = std::mem::replace(self, MergeVariant::Complete(None));
919        if let MergeVariant::InProgress(b1, b2, mut merge) = variant {
920            merge.work(&b1, &b2, fuel);
921            if *fuel > 0 {
922                *self = MergeVariant::Complete(Some((merge.done(), Some((b1, b2)))));
923            } else {
924                *self = MergeVariant::InProgress(b1, b2, merge);
925            }
926        } else {
927            *self = variant;
928        }
929    }
930}