Skip to main content

lsm_tree/compaction/leveled/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5#[cfg(test)]
6#[allow(
7    clippy::unwrap_used,
8    clippy::indexing_slicing,
9    clippy::useless_vec,
10    clippy::unnecessary_map_or,
11    reason = "test code"
12)]
13mod test;
14
15use super::{Choice, CompactionStrategy, Input as CompactionInput};
16use crate::{
17    HashSet, TableId,
18    compaction::state::{CompactionState, hidden_set::HiddenSet},
19    config::Config,
20    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
21    table::{Table, util::aggregate_run_key_range},
22    version::{Level, Version, run::Ranged},
23};
24
25/// Tries to find the most optimal compaction set from one level into the other.
26///
27/// Scans all runs in both levels to handle transient multi-run states from
28/// multi-level compaction (#108). See #122 Part 3.
29fn pick_minimal_compaction(
30    curr_level: &Level,
31    next_level: &Level,
32    hidden_set: &HiddenSet,
33    _overshoot: u64,
34    table_base_size: u64,
35    cmp: &dyn crate::comparator::UserComparator,
36) -> Option<(HashSet<TableId>, bool)> {
37    // NOTE: Find largest trivial move (if it exists)
38    // Check all runs in curr_level for a window that doesn't overlap ANY run
39    // in next_level.
40    for curr_run in curr_level.iter() {
41        if let Some(window) = curr_run.shrinking_windows().find(|window| {
42            if hidden_set.is_blocked(window.iter().map(Table::id)) {
43                return false;
44            }
45
46            if next_level.is_empty() {
47                return true;
48            }
49
50            let key_range = aggregate_run_key_range(window);
51
52            // Must not overlap ANY run in the next level
53            next_level
54                .iter()
55                .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
56        }) {
57            let ids = window.iter().map(Table::id).collect();
58            return Some((ids, true));
59        }
60    }
61
62    // NOTE: Look for merges
63    // Iterate windows across all runs in next_level, pull in from all runs
64    // in curr_level.
65    if next_level.is_empty() {
66        return None;
67    }
68
69    next_level
70        .iter()
71        .flat_map(|run| {
72            // Cap per-run windows at 50x table_base_size. take_while is safe
73            // here because growing_windows within a single run are monotonically
74            // increasing in size — once one exceeds the cap, all subsequent will too.
75            run.growing_windows().take_while(|window| {
76                let size = window.iter().map(Table::file_size).sum::<u64>();
77                size <= (50 * table_base_size)
78            })
79        })
80        .filter_map(|window| {
81            if hidden_set.is_blocked(window.iter().map(Table::id)) {
82                return None;
83            }
84
85            let key_range = aggregate_run_key_range(window);
86
87            // Pull in contained tables from ALL runs in curr_level
88            let curr_level_pull_in: Vec<&Table> = curr_level
89                .iter()
90                .flat_map(|run| run.get_contained_cmp(&key_range, cmp))
91                .collect();
92
93            let curr_level_size = curr_level_pull_in
94                .iter()
95                .map(|t| Table::file_size(t))
96                .sum::<u64>();
97
98            if curr_level_size == 0 {
99                return None;
100            }
101
102            if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
103                return None;
104            }
105
106            let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
107            let compaction_bytes = curr_level_size + next_level_size;
108
109            Some((window, curr_level_pull_in, compaction_bytes))
110        })
111        .min_by_key(|(_, _, bytes)| *bytes)
112        .map(|(window, curr_level_pull_in, _)| {
113            let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
114            ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
115            (ids, false)
116        })
117}
118
119#[doc(hidden)]
120pub const NAME: &str = "LeveledCompaction";
121
122/// Leveled compaction strategy (LCS)
123///
124/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
125///
126/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
127///
128/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
129///
130/// LCS is the recommended compaction strategy to use.
131///
132/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
133#[derive(Clone)]
134pub struct Strategy {
135    l0_threshold: u8,
136
137    /// The target table size as disk (possibly compressed).
138    target_size: u64,
139
140    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
141    level_ratio_policy: Vec<f32>,
142
143    /// When true, dynamically sizes levels based on the actual data in
144    /// the last non-empty level, reducing space amplification to ~1.1x.
145    ///
146    /// Same as `level_compaction_dynamic_level_bytes` in `RocksDB`.
147    ///
148    /// Default = false (static leveling).
149    dynamic: bool,
150
151    /// When true, enables multi-level compaction: if L0→L1 is chosen but
152    /// L1 is already oversized, compacts L0+L1→L2 directly in one pass
153    /// to avoid a write-then-rewrite cycle.
154    ///
155    /// Default = false.
156    multi_level: bool,
157}
158
159impl Default for Strategy {
160    fn default() -> Self {
161        Self {
162            l0_threshold: 4,
163            target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
164            level_ratio_policy: vec![10.0],
165            dynamic: false,
166            multi_level: false,
167        }
168    }
169}
170
171impl Strategy {
172    /// Sets the growth ratio between levels.
173    ///
174    /// Same as `set_max_bytes_for_level_multiplier` in `RocksDB`.
175    ///
176    /// Default = [10.0]
177    #[must_use]
178    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
179        self.level_ratio_policy = policy;
180        self
181    }
182
183    /// Sets the L0 threshold.
184    ///
185    /// When the number of tables in L0 reaches this threshold,
186    /// they are merged into L1.
187    ///
188    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
189    ///
190    /// Default = 4
191    #[must_use]
192    pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
193        self.l0_threshold = threshold;
194        self
195    }
196
197    /// Sets the table target size on disk (possibly compressed).
198    ///
199    /// Same as `target_file_size_base` in `RocksDB`.
200    ///
201    /// Default = 64 MiB
202    #[must_use]
203    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
204        self.target_size = bytes;
205        self
206    }
207
208    /// Enables dynamic level sizing based on actual data in the last level.
209    ///
210    /// When enabled, level target sizes are computed top-down from the actual
211    /// size of the last non-empty level, divided by the ratio at each step.
212    /// This reduces space amplification to ~1.1x while keeping write
213    /// amplification comparable to static leveling.
214    ///
215    /// Same as `level_compaction_dynamic_level_bytes` in `RocksDB`.
216    ///
217    /// Default = false
218    #[must_use]
219    pub fn with_dynamic_level_bytes(mut self, enabled: bool) -> Self {
220        self.dynamic = enabled;
221        self
222    }
223
224    /// Enables multi-level compaction optimization.
225    ///
226    /// When L0→L1 compaction is selected but L1 already exceeds its target
227    /// size, this option allows compacting L0+L1 directly into L2 in one
228    /// pass, avoiding the write-then-rewrite cycle that would otherwise
229    /// occur.
230    ///
231    /// Default = false
232    #[must_use]
233    pub fn with_multi_level(mut self, enabled: bool) -> Self {
234        self.multi_level = enabled;
235        self
236    }
237
238    /// Calculates the size of L1.
239    fn level_base_size(&self) -> u64 {
240        self.target_size * u64::from(self.l0_threshold)
241    }
242
243    /// Calculates the level target size.
244    ///
245    /// L1 = `level_base_size`
246    ///
247    /// L2 = `level_base_size * ratio`
248    ///
249    /// L3 = `level_base_size * ratio * ratio`
250    ///
251    /// ...
252    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
253        assert!(
254            canonical_level_idx >= 1,
255            "level_target_size does not apply to L0",
256        );
257
258        if canonical_level_idx == 1 {
259            // u64::from(self.target_size)
260            self.level_base_size()
261        } else {
262            #[expect(
263                clippy::cast_precision_loss,
264                reason = "precision loss is acceptable for level size calculations"
265            )]
266            let mut size = self.level_base_size() as f32;
267
268            // NOTE: Minus 2 because |{L0, L1}|
269            for idx in 0..=(canonical_level_idx - 2) {
270                let ratio = self
271                    .level_ratio_policy
272                    .get(usize::from(idx))
273                    .copied()
274                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
275
276                size *= ratio;
277            }
278
279            #[expect(
280                clippy::cast_possible_truncation,
281                clippy::cast_sign_loss,
282                reason = "size is always positive and will never even come close to u64::MAX"
283            )]
284            {
285                size as u64
286            }
287        }
288    }
289
290    /// Computes level target sizes for all 7 levels.
291    ///
292    /// In static mode, uses the standard exponential formula.
293    /// In dynamic mode, derives targets from the actual size of the last
294    /// non-empty level, dividing backwards by the ratio at each step.
295    /// Falls back to static mode when the tree is small (dynamic L1 target
296    /// would be less than `level_base_size`).
297    fn compute_level_targets(
298        &self,
299        version: &Version,
300        level_shift: usize,
301        state: &CompactionState,
302    ) -> [u64; 7] {
303        let mut targets = [u64::MAX; 7];
304
305        // L0 target is not size-based (it's count-based), so leave at MAX
306        targets[0] = u64::MAX;
307
308        if self.dynamic {
309            // Find the last non-empty level (Lmax) and its actual size.
310            // Iterate forward and keep track of the last non-empty level
311            // since iter_levels() does not support DoubleEndedIterator.
312            let mut lmax_idx = None;
313
314            for (idx, lvl) in version.iter_levels().enumerate().skip(1) {
315                if !lvl.is_empty() {
316                    lmax_idx = Some(idx);
317                }
318            }
319
320            if let Some(lmax_idx) = lmax_idx {
321                #[expect(
322                    clippy::expect_used,
323                    reason = "lmax_idx was found by iterating levels, so it must exist"
324                )]
325                let lmax_level = version.level(lmax_idx).expect("level should exist");
326
327                let lmax_size: u64 = lmax_level
328                    .iter()
329                    .flat_map(|run| run.iter())
330                    .filter(|table| !state.hidden_set().is_hidden(table.id()))
331                    .map(Table::file_size)
332                    .sum();
333
334                // Work backwards from Lmax
335                if let Some(slot) = targets.get_mut(lmax_idx) {
336                    *slot = lmax_size;
337                }
338
339                #[expect(
340                    clippy::cast_precision_loss,
341                    reason = "precision loss is acceptable for level size calculations"
342                )]
343                let mut current_target = lmax_size as f64;
344
345                // Only backfill down to the effective L1 (accounting for
346                // level_shift), not to physical level 1, so we don't
347                // overwrite slots below the shifted canonical L1.
348                let dynamic_l1_idx = level_shift + 1;
349
350                for idx in (dynamic_l1_idx..lmax_idx).rev() {
351                    let canonical = idx - level_shift;
352                    // In the forward formula, target(k+1)/target(k) = ratio[k-1],
353                    // so backwards: target(k) = target(k+1) / ratio[k-1]
354                    let ratio_idx = canonical.saturating_sub(1);
355                    let ratio = f64::from(
356                        self.level_ratio_policy
357                            .get(ratio_idx)
358                            .copied()
359                            .unwrap_or_else(|| {
360                                self.level_ratio_policy.last().copied().unwrap_or(10.0)
361                            }),
362                    );
363
364                    // Guard against invalid ratios (zero, negative, NaN, infinite).
365                    // Fall back to static targets instead of leaving partial
366                    // dynamic targets with u64::MAX in lower-level slots.
367                    if !ratio.is_finite() || ratio <= 0.0 {
368                        return self.compute_static_targets(level_shift);
369                    }
370
371                    current_target /= ratio;
372
373                    #[expect(
374                        clippy::cast_possible_truncation,
375                        clippy::cast_sign_loss,
376                        reason = "target is always positive"
377                    )]
378                    if let Some(slot) = targets.get_mut(idx) {
379                        *slot = current_target as u64;
380                    }
381                }
382
383                // Fallback: if dynamic L1 target is too small, use static.
384                // Compare the shifted L1 slot, not physical slot 1.
385                let static_l1 = self.level_base_size();
386                if targets.get(dynamic_l1_idx).copied().unwrap_or(0) < static_l1 {
387                    return self.compute_static_targets(level_shift);
388                }
389
390                return targets;
391            }
392        }
393
394        self.compute_static_targets(level_shift)
395    }
396
397    /// Computes static (exponential) level targets.
398    fn compute_static_targets(&self, level_shift: usize) -> [u64; 7] {
399        let mut targets = [u64::MAX; 7];
400
401        for (idx, slot) in targets.iter_mut().enumerate().skip(1) {
402            if idx <= level_shift {
403                continue; // stays at u64::MAX
404            }
405            #[expect(
406                clippy::cast_possible_truncation,
407                reason = "level index is bounded by level count (7)"
408            )]
409            {
410                *slot = self.level_target_size((idx - level_shift) as u8);
411            }
412        }
413
414        targets
415    }
416}
417
418impl CompactionStrategy for Strategy {
419    fn get_name(&self) -> &'static str {
420        NAME
421    }
422
423    fn get_config(&self) -> Vec<crate::KvPair> {
424        vec![
425            (
426                crate::UserKey::from("leveled_l0_threshold"),
427                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
428            ),
429            (
430                crate::UserKey::from("leveled_target_size"),
431                crate::UserValue::from(self.target_size.to_le_bytes()),
432            ),
433            (
434                crate::UserKey::from("leveled_level_ratio_policy"),
435                crate::UserValue::from({
436                    use byteorder::{LittleEndian, WriteBytesExt};
437
438                    let mut v = vec![];
439
440                    #[expect(
441                        clippy::expect_used,
442                        clippy::cast_possible_truncation,
443                        reason = "writing into Vec should not fail; policies have length of 255 max"
444                    )]
445                    v.write_u8(self.level_ratio_policy.len() as u8)
446                        .expect("cannot fail");
447
448                    for &f in &self.level_ratio_policy {
449                        #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
450                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
451                    }
452
453                    v
454                }),
455            ),
456            (
457                crate::UserKey::from("leveled_dynamic"),
458                crate::UserValue::from([u8::from(self.dynamic)]),
459            ),
460            (
461                crate::UserKey::from("leveled_multi_level"),
462                crate::UserValue::from([u8::from(self.multi_level)]),
463            ),
464        ]
465    }
466
467    #[expect(clippy::too_many_lines)]
468    fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
469        assert!(version.level_count() == 7, "should have exactly 7 levels");
470        let cmp = config.comparator.as_ref();
471
472        // Trivial move into Lmax
473        'trivial_lmax: {
474            #[expect(
475                clippy::expect_used,
476                reason = "level 0 is guaranteed to exist in a valid version"
477            )]
478            let l0 = version.level(0).expect("first level should exist");
479
480            if !l0.is_empty() && l0.is_disjoint() {
481                let lmax_index = version.level_count() - 1;
482
483                if (1..lmax_index).any(|idx| {
484                    #[expect(
485                        clippy::expect_used,
486                        reason = "levels within level_count are guaranteed to exist"
487                    )]
488                    let level = version.level(idx).expect("level should exist");
489                    !level.is_empty()
490                }) {
491                    // There are intermediary levels with data, cannot trivially move to Lmax
492                    break 'trivial_lmax;
493                }
494
495                #[expect(
496                    clippy::expect_used,
497                    reason = "lmax_index is derived from level_count so level is guaranteed to exist"
498                )]
499                let lmax = version.level(lmax_index).expect("last level should exist");
500
501                if !lmax
502                    .aggregate_key_range_cmp(cmp)
503                    .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
504                {
505                    return Choice::Move(CompactionInput {
506                        table_ids: l0.list_ids(),
507                        #[expect(
508                            clippy::cast_possible_truncation,
509                            reason = "level count is at most 7, fits in u8"
510                        )]
511                        dest_level: lmax_index as u8,
512                        canonical_level: 1,
513                        target_size: self.target_size,
514                    });
515                }
516            }
517        }
518
519        // Find the level that corresponds to L1
520        #[expect(clippy::map_unwrap_or)]
521        let first_non_empty_level = version
522            .iter_levels()
523            .enumerate()
524            .skip(1)
525            .find(|(_, lvl)| !lvl.is_empty())
526            .map(|(idx, _)| idx)
527            .unwrap_or_else(|| version.level_count() - 1);
528
529        let mut canonical_l1_idx = first_non_empty_level;
530
531        // Number of levels we have to shift to get from the actual level idx to the canonical
532        let mut level_shift = canonical_l1_idx - 1;
533
534        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
535            let need_new_l1 = version
536                .iter_levels()
537                .enumerate()
538                .skip(1)
539                .filter(|(_, lvl)| !lvl.is_empty())
540                .all(|(idx, level)| {
541                    let level_size = level
542                        .iter()
543                        .flat_map(|x| x.iter())
544                        // NOTE: Take bytes that are already being compacted into account,
545                        // otherwise we may be overcompensating
546                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
547                        .map(Table::file_size)
548                        .sum::<u64>();
549
550                    #[expect(
551                        clippy::cast_possible_truncation,
552                        reason = "level index is bounded by level count (7, technically 255)"
553                    )]
554                    let target_size = self.level_target_size((idx - level_shift) as u8);
555
556                    level_size > target_size
557                });
558
559            // Move up L1 one level if all current levels are at capacity
560            if need_new_l1 {
561                canonical_l1_idx -= 1;
562                level_shift -= 1;
563            }
564        }
565
566        // Trivial move into L1
567        'trivial: {
568            let first_level = version.l0();
569            let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
570
571            if first_level.run_count() == 1 {
572                if version.level_is_busy(0, state.hidden_set())
573                    || version.level_is_busy(target_level_idx, state.hidden_set())
574                {
575                    break 'trivial;
576                }
577
578                let Some(target_level) = &version.level(target_level_idx) else {
579                    break 'trivial;
580                };
581
582                if target_level.run_count() != 1 {
583                    break 'trivial;
584                }
585
586                let key_range = first_level.aggregate_key_range_cmp(cmp);
587
588                // Get overlapping tables in next level
589                let get_overlapping = target_level
590                    .iter()
591                    .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
592                    .map(Table::id)
593                    .next();
594
595                if get_overlapping.is_none() && first_level.is_disjoint() {
596                    #[expect(
597                        clippy::cast_possible_truncation,
598                        reason = "level index is bounded by level count (7)"
599                    )]
600                    return Choice::Move(CompactionInput {
601                        table_ids: first_level.list_ids(),
602                        dest_level: target_level_idx as u8,
603                        canonical_level: 1,
604                        target_size: self.target_size,
605                    });
606                }
607            }
608        }
609
610        // Intra-L0 compaction: merge multiple L0 runs into a single run within L0
611        // when table count is below the L0→L1 threshold
612        {
613            let first_level = version.l0();
614
615            if first_level.run_count() > 1
616                && first_level.table_count() < usize::from(self.l0_threshold)
617                && !version.level_is_busy(0, state.hidden_set())
618            {
619                return Choice::Merge(CompactionInput {
620                    table_ids: first_level.list_ids(),
621                    dest_level: 0,
622                    canonical_level: 0,
623                    target_size: self.target_size,
624                });
625            }
626        }
627
628        // Compute level targets (supports both static and dynamic modes)
629        let level_targets = self.compute_level_targets(version, level_shift, state);
630
631        // Scoring
632        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
633
634        {
635            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
636            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
637            // decisions can prioritize tables that would free the most reclaimable values.
638
639            // Score first level
640            let first_level = version.l0();
641
642            if first_level.table_count() >= usize::from(self.l0_threshold) {
643                #[expect(
644                    clippy::cast_precision_loss,
645                    reason = "precision loss is acceptable for scoring calculations"
646                )]
647                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
648                scores[0] = (ratio, 0);
649            }
650
651            // Score L1+
652            for (idx, level) in version.iter_levels().enumerate().skip(1) {
653                if level.is_empty() {
654                    continue;
655                }
656
657                let level_size = level
658                    .iter()
659                    .flat_map(|x| x.iter())
660                    // NOTE: Take bytes that are already being compacted into account,
661                    // otherwise we may be overcompensating
662                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
663                    .map(Table::file_size)
664                    .sum::<u64>();
665
666                // NOTE: We check for level length above
667                #[expect(clippy::indexing_slicing)]
668                let target_size = level_targets[idx];
669
670                #[expect(
671                    clippy::indexing_slicing,
672                    reason = "idx is from iter_levels().enumerate() so always < 7 = scores.len()"
673                )]
674                if level_size > target_size {
675                    #[expect(
676                        clippy::cast_precision_loss,
677                        reason = "precision loss is acceptable for scoring calculations"
678                    )]
679                    let score = level_size as f64 / target_size as f64;
680                    scores[idx] = (score, level_size - target_size);
681
682                    // NOTE: Force a trivial move
683                    if version
684                        .level(idx + 1)
685                        .is_some_and(|next_level| next_level.is_empty())
686                    {
687                        scores[idx] = (99.99, 999);
688                    }
689                }
690            }
691
692            // NOTE: Never score Lmax
693            {
694                scores[6] = (0.0, 0);
695            }
696        }
697
698        // Choose compaction
699        #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
700        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
701            .into_iter()
702            .enumerate()
703            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
704                score_a
705                    .partial_cmp(score_b)
706                    .unwrap_or(std::cmp::Ordering::Equal)
707            })
708            .expect("should have highest score somewhere");
709
710        if score < 1.0 {
711            return Choice::DoNothing;
712        }
713
714        // We choose L0->L1 compaction
715        if level_idx_with_highest_score == 0 {
716            let Some(first_level) = version.level(0) else {
717                return Choice::DoNothing;
718            };
719
720            if version.level_is_busy(0, state.hidden_set())
721                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
722            {
723                return Choice::DoNothing;
724            }
725
726            let Some(target_level) = &version.level(canonical_l1_idx) else {
727                return Choice::DoNothing;
728            };
729
730            let mut table_ids = first_level.list_ids();
731
732            let key_range = first_level.aggregate_key_range_cmp(cmp);
733
734            // Get overlapping tables in next level
735            let target_level_overlapping_table_ids: Vec<_> = target_level
736                .iter()
737                .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
738                .map(Table::id)
739                .collect();
740
741            table_ids.extend(&target_level_overlapping_table_ids);
742
743            // Multi-level compaction: if L1 is already oversized, skip it
744            // and compact L0+L1 directly into L2 in one pass.
745            // NOTE: Currently triggers on pre-compaction L1 score. A future
746            // improvement could use projected post-compaction bytes to also
747            // catch cases where L1 is close to its target and this batch
748            // would push it over.
749            if self.multi_level {
750                let l1_score = scores.get(canonical_l1_idx).map_or(0.0, |(s, _)| *s);
751                let l2_idx = canonical_l1_idx + 1;
752
753                if l1_score > 1.0
754                    && l2_idx < version.level_count()
755                    && !version.level_is_busy(l2_idx, state.hidden_set())
756                    && let Some(l2) = version.level(l2_idx)
757                {
758                    // Include ALL L1 tables (we're emptying L1 into L2)
759                    table_ids.extend(target_level.list_ids());
760
761                    // Include overlapping L2 tables — query per merged
762                    // interval instead of one coarse aggregate (#72).
763                    // An aggregate across disjoint tables (e.g. [a,d] and
764                    // [x,z] → [a,z]) covers gaps and pulls in L2 tables
765                    // that don't actually overlap any input table.
766                    //
767                    // Merge input key ranges into disjoint intervals first
768                    // to reduce redundant queries when L0 tables overlap
769                    // (#122 Part 2). Sort by comparator-min, then coalesce.
770                    {
771                        let mut input_ranges: Vec<_> = target_level
772                            .iter()
773                            .chain(first_level.iter())
774                            .flat_map(|run| run.iter())
775                            .map(|t| t.key_range().clone())
776                            .collect();
777                        input_ranges.sort_by(|a, b| cmp.compare(a.min(), b.min()));
778
779                        let merged = crate::KeyRange::merge_sorted_cmp(input_ranges, cmp);
780
781                        for run in l2.iter() {
782                            for interval in &merged {
783                                for l2t in run.get_overlapping_cmp(interval, cmp) {
784                                    table_ids.insert(Table::id(l2t));
785                                }
786                            }
787                        }
788                    }
789
790                    #[expect(
791                        clippy::cast_possible_truncation,
792                        reason = "level index is bounded by level count (7)"
793                    )]
794                    return Choice::Merge(CompactionInput {
795                        table_ids,
796                        dest_level: l2_idx as u8,
797                        canonical_level: 2,
798                        target_size: self.target_size,
799                    });
800                }
801            }
802
803            #[expect(
804                clippy::cast_possible_truncation,
805                reason = "level index is bounded by level count (7, technically 255)"
806            )]
807            let choice = CompactionInput {
808                table_ids,
809                dest_level: canonical_l1_idx as u8,
810                canonical_level: 1,
811                target_size: self.target_size,
812            };
813
814            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
815                return Choice::Move(choice);
816            }
817            return Choice::Merge(choice);
818        }
819
820        // We choose L1+ compaction instead
821
822        // NOTE: Level count is 255 max
823        #[expect(clippy::cast_possible_truncation)]
824        let curr_level_index = level_idx_with_highest_score as u8;
825
826        let next_level_index = curr_level_index + 1;
827
828        let Some(level) = version.level(level_idx_with_highest_score) else {
829            return Choice::DoNothing;
830        };
831
832        let Some(next_level) = version.level(next_level_index as usize) else {
833            return Choice::DoNothing;
834        };
835
836        // pick_minimal_compaction scans all runs in both levels, handling
837        // transient multi-run states from multi-level compaction (#108, #122).
838        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
839            level,
840            next_level,
841            state.hidden_set(),
842            overshoot_bytes,
843            self.target_size,
844            cmp,
845        ) else {
846            return Choice::DoNothing;
847        };
848
849        #[expect(
850            clippy::cast_possible_truncation,
851            reason = "level shift is bounded by level count (7, technically 255)"
852        )]
853        let choice = CompactionInput {
854            table_ids,
855            dest_level: next_level_index,
856            canonical_level: next_level_index - (level_shift as u8),
857            target_size: self.target_size,
858        };
859
860        if can_trivial_move && level.is_disjoint() {
861            return Choice::Move(choice);
862        }
863        Choice::Merge(choice)
864    }
865}