differential_dataflow/trace/implementations/
spine_fueled.rs

1//! An append-only collection of update batches.
2//!
3//! The `Spine` is a general-purpose trace implementation based on collection and merging
4//! immutable batches of updates. It is generic with respect to the batch type, and can be
5//! instantiated for any implementor of `trace::Batch`.
6//!
7//! ## Design
8//!
9//! This spine is represented as a list of layers, where each element in the list is either
10//!
11//!   1. MergeState::Vacant  empty
12//!   2. MergeState::Single  a single batch
13//!   3. MergeState::Double  a pair of batches
14//!
15//! Each "batch" has the option to be `None`, indicating a non-batch that nonetheless acts
16//! as a number of updates proportionate to the level at which it exists (for bookkeeping).
17//!
18//! Each of the batches at layer i contains at most 2^i elements. The sequence of batches
19//! should have the upper bound of one match the lower bound of the next. Batches may be
20//! logically empty, with matching upper and lower bounds, as a bookkeeping mechanism.
21//!
22//! Each batch at layer i is treated as if it contains exactly 2^i elements, even though it
23//! may actually contain fewer elements. This allows us to decouple the physical representation
24//! from logical amounts of effort invested in each batch. It allows us to begin compaction and
25//! to reduce the number of updates, without compromising our ability to continue to move
26//! updates along the spine. We are explicitly making the trade-off that while some batches
27//! might compact at lower levels, we want to treat them as if they contained their full set of
28//! updates for accounting reasons (to apply work to higher levels).
29//!
30//! We maintain the invariant that for any in-progress merge at level k there should be fewer
31//! than 2^k records at levels lower than k. That is, even if we were to apply an unbounded
32//! amount of effort to those records, we would not have enough records to prompt a merge into
33//! the in-progress merge. Ideally, we maintain the extended invariant that for any in-progress
34//! merge at level k, the remaining effort required (number of records minus applied effort) is
35//! less than the number of records that would need to be added to reach 2^k records in layers
36//! below.
37//!
38//! ## Mathematics
39//!
40//! When a merge is initiated, there should be a non-negative *deficit* of updates before the layers
41//! below could plausibly produce a new batch for the currently merging layer. We must determine a
42//! factor of proportionality, so that newly arrived updates provide at least that amount of "fuel"
43//! towards the merging layer, so that the merge completes before lower levels invade.
44//!
45//! ### Deficit:
46//!
47//! A new merge is initiated only in response to the completion of a prior merge, or the introduction
48//! of new records from outside. The latter case is special, and will maintain our invariant trivially,
49//! so we will focus on the former case.
50//!
51//! When a merge at level k completes, assuming we have maintained our invariant then there should be
52//! fewer than 2^k records at lower levels. The newly created merge at level k+1 will require up to
53//! 2^k+2 units of work, and should not expect a new batch until strictly more than 2^k records are
54//! added. This means that a factor of proportionality of four should be sufficient to ensure that
55//! the merge completes before a new merge is initiated.
56//!
57//! When new records get introduced, we will need to roll up any batches at lower levels, which we
58//! treat as the introduction of records. Each of these virtual records introduced should either be
59//! accounted for the fuel it should contribute, as it results in the promotion of batches closer to
60//! in-progress merges.
61//!
62//! ### Fuel sharing
63//!
64//! We like the idea of applying fuel preferentially to merges at *lower* levels, under the idea that
65//! they are easier to complete, and we benefit from fewer total merges in progress. This does delay
66//! the completion of merges at higher levels, and may not obviously be a total win. If we choose to
67//! do this, we should make sure that we correctly account for completed merges at low layers: they
68//! should still extract fuel from new updates even though they have completed, at least until they
69//! have paid back any "debt" to higher layers by continuing to provide fuel as updates arrive.
70
71
72use std::fmt::Debug;
73
74use crate::logging::Logger;
75use crate::difference::Semigroup;
76use crate::lattice::Lattice;
77use crate::trace::{Batch, Batcher, Builder, BatchReader, Trace, TraceReader, ExertionLogic};
78use crate::trace::cursor::CursorList;
79use crate::trace::Merger;
80
81use ::timely::dataflow::operators::generic::OperatorInfo;
82use ::timely::progress::{Antichain, frontier::AntichainRef};
83use ::timely::order::PartialOrder;
84
85/// An append-only collection of update tuples.
86///
87/// A spine maintains a small number of immutable collections of update tuples, merging the collections when
88/// two have similar sizes. In this way, it allows the addition of more tuples, which may then be merged with
89/// other immutable collections.
90pub struct Spine<B: Batch, BA, BU>
91where
92    B::Time: Lattice+Ord,
93    B::Diff: Semigroup,
94    // Intended constraints:
95    // BA: Batcher<Time = B::Time>,
96    // BU: Builder<Item=BA::Item, Time=BA::Time, Output = B>,
97{
98    operator: OperatorInfo,
99    logger: Option<Logger>,
100    logical_frontier: Antichain<B::Time>,   // Times after which the trace must accumulate correctly.
101    physical_frontier: Antichain<B::Time>,  // Times after which the trace must be able to subset its inputs.
102    merging: Vec<MergeState<B>>,            // Several possibly shared collections of updates.
103    pending: Vec<B>,                        // Batches at times in advance of `frontier`.
104    upper: Antichain<B::Time>,
105    effort: usize,
106    activator: Option<timely::scheduling::activate::Activator>,
107    /// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
108    exert_logic: ExertionLogic,
109    phantom: std::marker::PhantomData<(BA, BU)>,
110}
111
112impl<B, BA, BU> TraceReader for Spine<B, BA, BU>
113where
114    B: Batch+Clone+'static,
115    B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
116    B::Diff: Semigroup,
117{
118    type Key<'a> = B::Key<'a>;
119    type KeyOwned = B::KeyOwned;
120    type Val<'a> = B::Val<'a>;
121    type ValOwned = B::ValOwned;
122    type Time = B::Time;
123    type Diff = B::Diff;
124
125    type Batch = B;
126    type Storage = Vec<B>;
127    type Cursor = CursorList<<B as BatchReader>::Cursor>;
128
129    fn cursor_through(&mut self, upper: AntichainRef<Self::Time>) -> Option<(Self::Cursor, Self::Storage)> {
130
131        // If `upper` is the minimum frontier, we can return an empty cursor.
132        // This can happen with operators that are written to expect the ability to acquire cursors
133        // for their prior frontiers, and which start at `[T::minimum()]`, such as `Reduce`, sadly.
134        if upper.less_equal(&<Self::Time as timely::progress::Timestamp>::minimum()) {
135            let cursors = Vec::new();
136            let storage = Vec::new();
137            return Some((CursorList::new(cursors, &storage), storage));
138        }
139
140        // The supplied `upper` should have the property that for each of our
141        // batch `lower` and `upper` frontiers, the supplied upper is comparable
142        // to the frontier; it should not be incomparable, because the frontiers
143        // that we created form a total order. If it is, there is a bug.
144        //
145        // We should acquire a cursor including all batches whose upper is less
146        // or equal to the supplied upper, excluding all batches whose lower is
147        // greater or equal to the supplied upper, and if a batch straddles the
148        // supplied upper it had better be empty.
149
150        // We shouldn't grab a cursor into a closed trace, right?
151        assert!(self.logical_frontier.borrow().len() > 0);
152
153        // Check that `upper` is greater or equal to `self.physical_frontier`.
154        // Otherwise, the cut could be in `self.merging` and it is user error anyhow.
155        // assert!(upper.iter().all(|t1| self.physical_frontier.iter().any(|t2| t2.less_equal(t1))));
156        assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &upper));
157
158        let mut cursors = Vec::new();
159        let mut storage = Vec::new();
160
161        for merge_state in self.merging.iter().rev() {
162            match merge_state {
163                MergeState::Double(variant) => {
164                    match variant {
165                        MergeVariant::InProgress(batch1, batch2, _) => {
166                            if !batch1.is_empty() {
167                                cursors.push(batch1.cursor());
168                                storage.push(batch1.clone());
169                            }
170                            if !batch2.is_empty() {
171                                cursors.push(batch2.cursor());
172                                storage.push(batch2.clone());
173                            }
174                        },
175                        MergeVariant::Complete(Some((batch, _))) => {
176                            if !batch.is_empty() {
177                                cursors.push(batch.cursor());
178                                storage.push(batch.clone());
179                            }
180                        }
181                        MergeVariant::Complete(None) => { },
182                    }
183                },
184                MergeState::Single(Some(batch)) => {
185                    if !batch.is_empty() {
186                        cursors.push(batch.cursor());
187                        storage.push(batch.clone());
188                    }
189                },
190                MergeState::Single(None) => { },
191                MergeState::Vacant => { },
192            }
193        }
194
195        for batch in self.pending.iter() {
196
197            if !batch.is_empty() {
198
199                // For a non-empty `batch`, it is a catastrophic error if `upper`
200                // requires some-but-not-all of the updates in the batch. We can
201                // determine this from `upper` and the lower and upper bounds of
202                // the batch itself.
203                //
204                // TODO: It is not clear if this is the 100% correct logic, due
205                // to the possible non-total-orderedness of the frontiers.
206
207                let include_lower = PartialOrder::less_equal(&batch.lower().borrow(), &upper);
208                let include_upper = PartialOrder::less_equal(&batch.upper().borrow(), &upper);
209
210                if include_lower != include_upper && upper != batch.lower().borrow() {
211                    panic!("`cursor_through`: `upper` straddles batch");
212                }
213
214                // include pending batches
215                if include_upper {
216                    cursors.push(batch.cursor());
217                    storage.push(batch.clone());
218                }
219            }
220        }
221
222        Some((CursorList::new(cursors, &storage), storage))
223    }
224    #[inline]
225    fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
226        self.logical_frontier.clear();
227        self.logical_frontier.extend(frontier.iter().cloned());
228    }
229    #[inline]
230    fn get_logical_compaction(&mut self) -> AntichainRef<B::Time> { self.logical_frontier.borrow() }
231    #[inline]
232    fn set_physical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
233        // We should never request to rewind the frontier.
234        debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
235        self.physical_frontier.clear();
236        self.physical_frontier.extend(frontier.iter().cloned());
237        self.consider_merges();
238    }
239    #[inline]
240    fn get_physical_compaction(&mut self) -> AntichainRef<B::Time> { self.physical_frontier.borrow() }
241
242    #[inline]
243    fn map_batches<F: FnMut(&Self::Batch)>(&self, mut f: F) {
244        for batch in self.merging.iter().rev() {
245            match batch {
246                MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => { f(batch1); f(batch2); },
247                MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => { f(batch) },
248                MergeState::Single(Some(batch)) => { f(batch) },
249                _ => { },
250            }
251        }
252        for batch in self.pending.iter() {
253            f(batch);
254        }
255    }
256}
257
258// A trace implementation for any key type that can be borrowed from or converted into `Key`.
259// TODO: Almost all this implementation seems to be generic with respect to the trace and batch types.
260impl<B, BA, BU> Trace for Spine<B, BA, BU>
261where
262    B: Batch+Clone+'static,
263    B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
264    B::Diff: Semigroup,
265    BA: Batcher<Time = B::Time>,
266    BU: Builder<Item=BA::Item, Time=BA::Time, Output = B>,
267{
268    /// A type used to assemble batches from disordered updates.
269    type Batcher = BA;
270    /// A type used to assemble batches from ordered update sequences.
271    type Builder = BU;
272
273    fn new(
274        info: ::timely::dataflow::operators::generic::OperatorInfo,
275        logging: Option<crate::logging::Logger>,
276        activator: Option<timely::scheduling::activate::Activator>,
277    ) -> Self {
278        Self::with_effort(1, info, logging, activator)
279    }
280
281    /// Apply some amount of effort to trace maintenance.
282    ///
283    /// Whether and how much effort to apply is determined by `self.exert_logic`, a closure the user can set.
284    fn exert(&mut self) {
285        // If there is work to be done, ...
286        self.tidy_layers();
287        // Determine whether we should apply effort independent of updates.
288        if let Some(effort) = self.exert_effort() {
289
290            // If any merges exist, we can directly call `apply_fuel`.
291            if self.merging.iter().any(|b| b.is_double()) {
292                self.apply_fuel(&mut (effort as isize));
293            }
294            // Otherwise, we'll need to introduce fake updates to move merges along.
295            else {
296                // Introduce an empty batch with roughly *effort number of virtual updates.
297                let level = effort.next_power_of_two().trailing_zeros() as usize;
298                self.introduce_batch(None, level);
299            }
300            // We were not in reduced form, so let's check again in the future.
301            if let Some(activator) = &self.activator {
302                activator.activate();
303            }
304        }
305    }
306
307    fn set_exert_logic(&mut self, logic: ExertionLogic) {
308        self.exert_logic = logic;
309    }
310
311    // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin
312    // merging the batch. This means it is a good time to perform amortized work proportional
313    // to the size of batch.
314    fn insert(&mut self, batch: Self::Batch) {
315
316        // Log the introduction of a batch.
317        self.logger.as_ref().map(|l| l.log(crate::logging::BatchEvent {
318            operator: self.operator.global_id,
319            length: batch.len()
320        }));
321
322        assert!(batch.lower() != batch.upper());
323        assert_eq!(batch.lower(), &self.upper);
324
325        self.upper.clone_from(batch.upper());
326
327        // TODO: Consolidate or discard empty batches.
328        self.pending.push(batch);
329        self.consider_merges();
330    }
331
332    /// Completes the trace with a final empty batch.
333    fn close(&mut self) {
334        if !self.upper.borrow().is_empty() {
335            let builder = Self::Builder::new();
336            let batch = builder.done(self.upper.clone(), Antichain::new(), Antichain::from_elem(<Self::Time as timely::progress::Timestamp>::minimum()));
337            self.insert(batch);
338        }
339    }
340}
341
342// Drop implementation allows us to log batch drops, to zero out maintained totals.
343impl<B, BA, BU> Drop for Spine<B, BA, BU>
344where
345    B: Batch,
346    B::Time: Lattice+Ord,
347    B::Diff: Semigroup,
348{
349    fn drop(&mut self) {
350        self.drop_batches();
351    }
352}
353
354
355impl<B, BA, BU> Spine<B, BA, BU>
356where
357    B: Batch,
358    B::Time: Lattice+Ord,
359    B::Diff: Semigroup,
360{
361    /// Drops and logs batches. Used in `set_logical_compaction` and drop.
362    fn drop_batches(&mut self) {
363        if let Some(logger) = &self.logger {
364            for batch in self.merging.drain(..) {
365                match batch {
366                    MergeState::Single(Some(batch)) => {
367                        logger.log(crate::logging::DropEvent {
368                            operator: self.operator.global_id,
369                            length: batch.len(),
370                        });
371                    },
372                    MergeState::Double(MergeVariant::InProgress(batch1, batch2, _)) => {
373                        logger.log(crate::logging::DropEvent {
374                            operator: self.operator.global_id,
375                            length: batch1.len(),
376                        });
377                        logger.log(crate::logging::DropEvent {
378                            operator: self.operator.global_id,
379                            length: batch2.len(),
380                        });
381                    },
382                    MergeState::Double(MergeVariant::Complete(Some((batch, _)))) => {
383                        logger.log(crate::logging::DropEvent {
384                            operator: self.operator.global_id,
385                            length: batch.len(),
386                        });
387                    }
388                    _ => { },
389                }
390            }
391            for batch in self.pending.drain(..) {
392                logger.log(crate::logging::DropEvent {
393                    operator: self.operator.global_id,
394                    length: batch.len(),
395                });
396            }
397        }
398    }
399}
400
401impl<B, BA, BU> Spine<B, BA, BU>
402where
403    B: Batch,
404    B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
405    B::Diff: Semigroup,
406{
407    /// Determine the amount of effort we should exert in the absence of updates.
408    ///
409    /// This method prepares an iterator over batches, including the level, count, and length of each layer.
410    /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
411    fn exert_effort(&self) -> Option<usize> {
412        (self.exert_logic)(
413            Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| {
414                match batch {
415                    MergeState::Vacant => (index, 0, 0),
416                    MergeState::Single(_) => (index, 1, batch.len()),
417                    MergeState::Double(_) => (index, 2, batch.len()),
418                }
419            }))
420        )
421    }
422
423    /// Describes the merge progress of layers in the trace.
424    ///
425    /// Intended for diagnostics rather than public consumption.
426    #[allow(dead_code)]
427    fn describe(&self) -> Vec<(usize, usize)> {
428        self.merging
429            .iter()
430            .map(|b| match b {
431                MergeState::Vacant => (0, 0),
432                x @ MergeState::Single(_) => (1, x.len()),
433                x @ MergeState::Double(_) => (2, x.len()),
434            })
435            .collect()
436    }
437
438    /// Allocates a fueled `Spine` with a specified effort multiplier.
439    ///
440    /// This trace will merge batches progressively, with each inserted batch applying a multiple
441    /// of the batch's length in effort to each merge. The `effort` parameter is that multiplier.
442    /// This value should be at least one for the merging to happen; a value of zero is not helpful.
443    pub fn with_effort(
444        mut effort: usize,
445        operator: OperatorInfo,
446        logger: Option<crate::logging::Logger>,
447        activator: Option<timely::scheduling::activate::Activator>,
448    ) -> Self {
449
450        // Zero effort is .. not smart.
451        if effort == 0 { effort = 1; }
452
453        Spine {
454            operator,
455            logger,
456            logical_frontier: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
457            physical_frontier: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
458            merging: Vec::new(),
459            pending: Vec::new(),
460            upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
461            effort,
462            activator,
463            exert_logic: std::sync::Arc::new(|_batches| None),
464            phantom: std::marker::PhantomData,
465        }
466    }
467
468    /// Migrate data from `self.pending` into `self.merging`.
469    ///
470    /// This method reflects on the bookmarks held by others that may prevent merging, and in the
471    /// case that new batches can be introduced to the pile of mergeable batches, it gets on that.
472    #[inline(never)]
473    fn consider_merges(&mut self) {
474
475        // TODO: Consider merging pending batches before introducing them.
476        // TODO: We could use a `VecDeque` here to draw from the front and append to the back.
477        while !self.pending.is_empty() && PartialOrder::less_equal(self.pending[0].upper(), &self.physical_frontier)
478            //   self.physical_frontier.iter().all(|t1| self.pending[0].upper().iter().any(|t2| t2.less_equal(t1)))
479        {
480            // Batch can be taken in optimized insertion.
481            // Otherwise it is inserted normally at the end of the method.
482            let mut batch = Some(self.pending.remove(0));
483
484            // If `batch` and the most recently inserted batch are both empty, we can just fuse them.
485            // We can also replace a structurally empty batch with this empty batch, preserving the
486            // apparent record count but now with non-trivial lower and upper bounds.
487            if batch.as_ref().unwrap().len() == 0 {
488                if let Some(position) = self.merging.iter().position(|m| !m.is_vacant()) {
489                    if self.merging[position].is_single() && self.merging[position].len() == 0 {
490                        self.insert_at(batch.take(), position);
491                        let merged = self.complete_at(position);
492                        self.merging[position] = MergeState::Single(merged);
493                    }
494                }
495            }
496
497            // Normal insertion for the batch.
498            if let Some(batch) = batch {
499                let index = batch.len().next_power_of_two();
500                self.introduce_batch(Some(batch), index.trailing_zeros() as usize);
501            }
502        }
503
504        // Having performed all of our work, if we should perform more work reschedule ourselves.
505        if self.exert_effort().is_some() {
506            if let Some(activator) = &self.activator {
507                activator.activate();
508            }
509        }
510    }
511
512    /// Introduces a batch at an indicated level.
513    ///
514    /// The level indication is often related to the size of the batch, but
515    /// it can also be used to artificially fuel the computation by supplying
516    /// empty batches at non-trivial indices, to move merges along.
517    pub fn introduce_batch(&mut self, batch: Option<B>, batch_index: usize) {
518
519        // Step 0.  Determine an amount of fuel to use for the computation.
520        //
521        //          Fuel is used to drive maintenance of the data structure,
522        //          and in particular are used to make progress through merges
523        //          that are in progress. The amount of fuel to use should be
524        //          proportional to the number of records introduced, so that
525        //          we are guaranteed to complete all merges before they are
526        //          required as arguments to merges again.
527        //
528        //          The fuel use policy is negotiable, in that we might aim
529        //          to use relatively less when we can, so that we return
530        //          control promptly, or we might account more work to larger
531        //          batches. Not clear to me which are best, of if there
532        //          should be a configuration knob controlling this.
533
534        // The amount of fuel to use is proportional to 2^batch_index, scaled
535        // by a factor of self.effort which determines how eager we are in
536        // performing maintenance work. We need to ensure that each merge in
537        // progress receives fuel for each introduced batch, and so multiply
538        // by that as well.
539        if batch_index > 32 { println!("Large batch index: {}", batch_index); }
540
541        // We believe that eight units of fuel is sufficient for each introduced
542        // record, accounted as four for each record, and a potential four more
543        // for each virtual record associated with promoting existing smaller
544        // batches. We could try and make this be less, or be scaled to merges
545        // based on their deficit at time of instantiation. For now, we remain
546        // conservative.
547        let mut fuel = 8 << batch_index;
548        // Scale up by the effort parameter, which is calibrated to one as the
549        // minimum amount of effort.
550        fuel *= self.effort;
551        // Convert to an `isize` so we can observe any fuel shortfall.
552        let mut fuel = fuel as isize;
553
554        // Step 1.  Apply fuel to each in-progress merge.
555        //
556        //          Before we can introduce new updates, we must apply any
557        //          fuel to in-progress merges, as this fuel is what ensures
558        //          that the merges will be complete by the time we insert
559        //          the updates.
560        self.apply_fuel(&mut fuel);
561
562        // Step 2.  We must ensure the invariant that adjacent layers do not
563        //          contain two batches will be satisfied when we insert the
564        //          batch. We forcibly completing all merges at layers lower
565        //          than and including `batch_index`, so that the new batch
566        //          is inserted into an empty layer.
567        //
568        //          We could relax this to "strictly less than `batch_index`"
569        //          if the layer above has only a single batch in it, which
570        //          seems not implausible if it has been the focus of effort.
571        //
572        //          This should be interpreted as the introduction of some
573        //          volume of fake updates, and we will need to fuel merges
574        //          by a proportional amount to ensure that they are not
575        //          surprised later on. The number of fake updates should
576        //          correspond to the deficit for the layer, which perhaps
577        //          we should track explicitly.
578        self.roll_up(batch_index);
579
580        // Step 3. This insertion should be into an empty layer. It is a
581        //         logical error otherwise, as we may be violating our
582        //         invariant, from which all wonderment derives.
583        self.insert_at(batch, batch_index);
584
585        // Step 4. Tidy the largest layers.
586        //
587        //         It is important that we not tidy only smaller layers,
588        //         as their ascension is what ensures the merging and
589        //         eventual compaction of the largest layers.
590        self.tidy_layers();
591    }
592
593    /// Ensures that an insertion at layer `index` will succeed.
594    ///
595    /// This method is subject to the constraint that all existing batches
596    /// should occur at higher levels, which requires it to "roll up" batches
597    /// present at lower levels before the method is called. In doing this,
598    /// we should not introduce more virtual records than 2^index, as that
599    /// is the amount of excess fuel we have budgeted for completing merges.
600    fn roll_up(&mut self, index: usize) {
601
602        // Ensure entries sufficient for `index`.
603        while self.merging.len() <= index {
604            self.merging.push(MergeState::Vacant);
605        }
606
607        // We only need to roll up if there are non-vacant layers.
608        if self.merging[.. index].iter().any(|m| !m.is_vacant()) {
609
610            // Collect and merge all batches at layers up to but not including `index`.
611            let mut merged = None;
612            for i in 0 .. index {
613                self.insert_at(merged, i);
614                merged = self.complete_at(i);
615            }
616
617            // The merged results should be introduced at level `index`, which should
618            // be ready to absorb them (possibly creating a new merge at the time).
619            self.insert_at(merged, index);
620
621            // If the insertion results in a merge, we should complete it to ensure
622            // the upcoming insertion at `index` does not panic.
623            if self.merging[index].is_double() {
624                let merged = self.complete_at(index);
625                self.insert_at(merged, index + 1);
626            }
627        }
628    }
629
630    /// Applies an amount of fuel to merges in progress.
631    ///
632    /// The supplied `fuel` is for each in progress merge, and if we want to spend
633    /// the fuel non-uniformly (e.g. prioritizing merges at low layers) we could do
634    /// so in order to maintain fewer batches on average (at the risk of completing
635    /// merges of large batches later, but tbh probably not much later).
636    pub fn apply_fuel(&mut self, fuel: &mut isize) {
637        // For the moment our strategy is to apply fuel independently to each merge
638        // in progress, rather than prioritizing small merges. This sounds like a
639        // great idea, but we need better accounting in place to ensure that merges
640        // that borrow against later layers but then complete still "acquire" fuel
641        // to pay back their debts.
642        for index in 0 .. self.merging.len() {
643            // Give each level independent fuel, for now.
644            let mut fuel = *fuel;
645            // Pass along various logging stuffs, in case we need to report success.
646            self.merging[index].work(&mut fuel);
647            // `fuel` could have a deficit at this point, meaning we over-spent when
648            // we took a merge step. We could ignore this, or maintain the deficit
649            // and account future fuel against it before spending again. It isn't
650            // clear why that would be especially helpful to do; we might want to
651            // avoid overspends at multiple layers in the same invocation (to limit
652            // latencies), but there is probably a rich policy space here.
653
654            // If a merge completes, we can immediately merge it in to the next
655            // level, which is "guaranteed" to be complete at this point, by our
656            // fueling discipline.
657            if self.merging[index].is_complete() {
658                let complete = self.complete_at(index);
659                self.insert_at(complete, index+1);
660            }
661        }
662    }
663
664    /// Inserts a batch at a specific location.
665    ///
666    /// This is a non-public internal method that can panic if we try and insert into a
667    /// layer which already contains two batches (and is still in the process of merging).
668    fn insert_at(&mut self, batch: Option<B>, index: usize) {
669        // Ensure the spine is large enough.
670        while self.merging.len() <= index {
671            self.merging.push(MergeState::Vacant);
672        }
673
674        // Insert the batch at the location.
675        match self.merging[index].take() {
676            MergeState::Vacant => {
677                self.merging[index] = MergeState::Single(batch);
678            }
679            MergeState::Single(old) => {
680                // Log the initiation of a merge.
681                self.logger.as_ref().map(|l| l.log(
682                    crate::logging::MergeEvent {
683                        operator: self.operator.global_id,
684                        scale: index,
685                        length1: old.as_ref().map(|b| b.len()).unwrap_or(0),
686                        length2: batch.as_ref().map(|b| b.len()).unwrap_or(0),
687                        complete: None,
688                    }
689                ));
690                let compaction_frontier = self.logical_frontier.borrow();
691                self.merging[index] = MergeState::begin_merge(old, batch, compaction_frontier);
692            }
693            MergeState::Double(_) => {
694                panic!("Attempted to insert batch into incomplete merge!")
695            }
696        };
697    }
698
699    /// Completes and extracts what ever is at layer `index`.
700    fn complete_at(&mut self, index: usize) -> Option<B> {
701        if let Some((merged, inputs)) = self.merging[index].complete() {
702            if let Some((input1, input2)) = inputs {
703                // Log the completion of a merge from existing parts.
704                self.logger.as_ref().map(|l| l.log(
705                    crate::logging::MergeEvent {
706                        operator: self.operator.global_id,
707                        scale: index,
708                        length1: input1.len(),
709                        length2: input2.len(),
710                        complete: Some(merged.len()),
711                    }
712                ));
713            }
714            Some(merged)
715        }
716        else {
717            None
718        }
719    }
720
721    /// Attempts to draw down large layers to size appropriate layers.
722    fn tidy_layers(&mut self) {
723
724        // If the largest layer is complete (not merging), we can attempt
725        // to draw it down to the next layer. This is permitted if we can
726        // maintain our invariant that below each merge there are at most
727        // half the records that would be required to invade the merge.
728        if !self.merging.is_empty() {
729            let mut length = self.merging.len();
730            if self.merging[length-1].is_single() {
731
732                // To move a batch down, we require that it contain few
733                // enough records that the lower level is appropriate,
734                // and that moving the batch would not create a merge
735                // violating our invariant.
736
737                let appropriate_level = self.merging[length-1].len().next_power_of_two().trailing_zeros() as usize;
738
739                // Continue only as far as is appropriate
740                while appropriate_level < length-1 {
741
742                    match self.merging[length-2].take() {
743                        // Vacant or structurally empty batches can be absorbed.
744                        MergeState::Vacant | MergeState::Single(None) => {
745                            self.merging.remove(length-2);
746                            length = self.merging.len();
747                        }
748                        // Single batches may initiate a merge, if sizes are
749                        // within bounds, but terminate the loop either way.
750                        MergeState::Single(Some(batch)) => {
751
752                            // Determine the number of records that might lead
753                            // to a merge. Importantly, this is not the number
754                            // of actual records, but the sum of upper bounds
755                            // based on indices.
756                            let mut smaller = 0;
757                            for (index, batch) in self.merging[..(length-2)].iter().enumerate() {
758                                match batch {
759                                    MergeState::Vacant => { },
760                                    MergeState::Single(_) => { smaller += 1 << index; },
761                                    MergeState::Double(_) => { smaller += 2 << index; },
762                                }
763                            }
764
765                            if smaller <= (1 << length) / 8 {
766                                self.merging.remove(length-2);
767                                self.insert_at(Some(batch), length-2);
768                            }
769                            else {
770                                self.merging[length-2] = MergeState::Single(Some(batch));
771                            }
772                            return;
773                        }
774                        // If a merge is in progress there is nothing to do.
775                        MergeState::Double(state) => {
776                            self.merging[length-2] = MergeState::Double(state);
777                            return;
778                        }
779                    }
780                }
781            }
782        }
783    }
784}
785
786
787/// Describes the state of a layer.
788///
789/// A layer can be empty, contain a single batch, or contain a pair of batches
790/// that are in the process of merging into a batch for the next layer.
791enum MergeState<B: Batch> {
792    /// An empty layer, containing no updates.
793    Vacant,
794    /// A layer containing a single batch.
795    ///
796    /// The `None` variant is used to represent a structurally empty batch present
797    /// to ensure the progress of maintenance work.
798    Single(Option<B>),
799    /// A layer containing two batches, in the process of merging.
800    Double(MergeVariant<B>),
801}
802
803impl<B: Batch> MergeState<B> where B::Time: Eq {
804
805    /// The number of actual updates contained in the level.
806    fn len(&self) -> usize {
807        match self {
808            MergeState::Single(Some(b)) => b.len(),
809            MergeState::Double(MergeVariant::InProgress(b1,b2,_)) => b1.len() + b2.len(),
810            MergeState::Double(MergeVariant::Complete(Some((b, _)))) => b.len(),
811            _ => 0,
812        }
813    }
814
815    /// True only for the MergeState::Vacant variant.
816    fn is_vacant(&self) -> bool {
817        if let MergeState::Vacant = self { true } else { false }
818    }
819
820    /// True only for the MergeState::Single variant.
821    fn is_single(&self) -> bool {
822        if let MergeState::Single(_) = self { true } else { false }
823    }
824
825    /// True only for the MergeState::Double variant.
826    fn is_double(&self) -> bool {
827        if let MergeState::Double(_) = self { true } else { false }
828    }
829
830    /// Immediately complete any merge.
831    ///
832    /// The result is either a batch, if there is a non-trivial batch to return
833    /// or `None` if there is no meaningful batch to return. This does not distinguish
834    /// between Vacant entries and structurally empty batches, which should be done
835    /// with the `is_complete()` method.
836    ///
837    /// There is the addional option of input batches.
838    fn complete(&mut self) -> Option<(B, Option<(B, B)>)>  {
839        match std::mem::replace(self, MergeState::Vacant) {
840            MergeState::Vacant => None,
841            MergeState::Single(batch) => batch.map(|b| (b, None)),
842            MergeState::Double(variant) => variant.complete(),
843        }
844    }
845
846    /// True iff the layer is a complete merge, ready for extraction.
847    fn is_complete(&mut self) -> bool {
848        if let MergeState::Double(MergeVariant::Complete(_)) = self {
849            true
850        }
851        else {
852            false
853        }
854    }
855
856    /// Performs a bounded amount of work towards a merge.
857    ///
858    /// If the merge completes, the resulting batch is returned.
859    /// If a batch is returned, it is the obligation of the caller
860    /// to correctly install the result.
861    fn work(&mut self, fuel: &mut isize) {
862        // We only perform work for merges in progress.
863        if let MergeState::Double(layer) = self {
864            layer.work(fuel)
865        }
866    }
867
868    /// Extract the merge state, typically temporarily.
869    fn take(&mut self) -> Self {
870        std::mem::replace(self, MergeState::Vacant)
871    }
872
873    /// Initiates the merge of an "old" batch with a "new" batch.
874    ///
875    /// The upper frontier of the old batch should match the lower
876    /// frontier of the new batch, with the resulting batch describing
877    /// their composed interval, from the lower frontier of the old
878    /// batch to the upper frontier of the new batch.
879    ///
880    /// Either batch may be `None` which corresponds to a structurally
881    /// empty batch whose upper and lower froniers are equal. This
882    /// option exists purely for bookkeeping purposes, and no computation
883    /// is performed to merge the two batches.
884    fn begin_merge(batch1: Option<B>, batch2: Option<B>, compaction_frontier: AntichainRef<B::Time>) -> MergeState<B> {
885        let variant =
886        match (batch1, batch2) {
887            (Some(batch1), Some(batch2)) => {
888                assert!(batch1.upper() == batch2.lower());
889                let begin_merge = <B as Batch>::begin_merge(&batch1, &batch2, compaction_frontier);
890                MergeVariant::InProgress(batch1, batch2, begin_merge)
891            }
892            (None, Some(x)) => MergeVariant::Complete(Some((x, None))),
893            (Some(x), None) => MergeVariant::Complete(Some((x, None))),
894            (None, None) => MergeVariant::Complete(None),
895        };
896
897        MergeState::Double(variant)
898    }
899}
900
901enum MergeVariant<B: Batch> {
902    /// Describes an actual in-progress merge between two non-trivial batches.
903    InProgress(B, B, <B as Batch>::Merger),
904    /// A merge that requires no further work. May or may not represent a non-trivial batch.
905    Complete(Option<(B, Option<(B, B)>)>),
906}
907
908impl<B: Batch> MergeVariant<B> {
909
910    /// Completes and extracts the batch, unless structurally empty.
911    ///
912    /// The result is either `None`, for structurally empty batches,
913    /// or a batch and optionally input batches from which it derived.
914    fn complete(mut self) -> Option<(B, Option<(B, B)>)> {
915        let mut fuel = isize::max_value();
916        self.work(&mut fuel);
917        if let MergeVariant::Complete(batch) = self { batch }
918        else { panic!("Failed to complete a merge!"); }
919    }
920
921    /// Applies some amount of work, potentially completing the merge.
922    ///
923    /// In case the work completes, the source batches are returned.
924    /// This allows the caller to manage the released resources.
925    fn work(&mut self, fuel: &mut isize) {
926        let variant = std::mem::replace(self, MergeVariant::Complete(None));
927        if let MergeVariant::InProgress(b1,b2,mut merge) = variant {
928            merge.work(&b1,&b2,fuel);
929            if *fuel > 0 {
930                *self = MergeVariant::Complete(Some((merge.done(), Some((b1,b2)))));
931            }
932            else {
933                *self = MergeVariant::InProgress(b1,b2,merge);
934            }
935        }
936        else {
937            *self = variant;
938        }
939    }
940}