Skip to main content

lsm_tree/compaction/leveled/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5#[cfg(test)]
6mod test;
7
8use super::{Choice, CompactionStrategy, Input as CompactionInput};
9use crate::{
10    compaction::state::{hidden_set::HiddenSet, CompactionState},
11    config::Config,
12    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
13    table::{util::aggregate_run_key_range, Table},
14    version::{run::Ranged, Level, Version},
15    HashSet, TableId,
16};
17
18/// Tries to find the most optimal compaction set from one level into the other.
19///
20/// Scans all runs in both levels to handle transient multi-run states from
21/// multi-level compaction (#108). See #122 Part 3.
22fn pick_minimal_compaction(
23    curr_level: &Level,
24    next_level: &Level,
25    hidden_set: &HiddenSet,
26    _overshoot: u64,
27    table_base_size: u64,
28    cmp: &dyn crate::comparator::UserComparator,
29) -> Option<(HashSet<TableId>, bool)> {
30    // NOTE: Find largest trivial move (if it exists)
31    // Check all runs in curr_level for a window that doesn't overlap ANY run
32    // in next_level.
33    for curr_run in curr_level.iter() {
34        if let Some(window) = curr_run.shrinking_windows().find(|window| {
35            if hidden_set.is_blocked(window.iter().map(Table::id)) {
36                return false;
37            }
38
39            if next_level.is_empty() {
40                return true;
41            }
42
43            let key_range = aggregate_run_key_range(window);
44
45            // Must not overlap ANY run in the next level
46            next_level
47                .iter()
48                .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
49        }) {
50            let ids = window.iter().map(Table::id).collect();
51            return Some((ids, true));
52        }
53    }
54
55    // NOTE: Look for merges
56    // Iterate windows across all runs in next_level, pull in from all runs
57    // in curr_level.
58    if next_level.is_empty() {
59        return None;
60    }
61
62    next_level
63        .iter()
64        .flat_map(|run| {
65            // Cap per-run windows at 50x table_base_size. take_while is safe
66            // here because growing_windows within a single run are monotonically
67            // increasing in size — once one exceeds the cap, all subsequent will too.
68            run.growing_windows().take_while(|window| {
69                let size = window.iter().map(Table::file_size).sum::<u64>();
70                size <= (50 * table_base_size)
71            })
72        })
73        .filter_map(|window| {
74            if hidden_set.is_blocked(window.iter().map(Table::id)) {
75                return None;
76            }
77
78            let key_range = aggregate_run_key_range(window);
79
80            // Pull in contained tables from ALL runs in curr_level
81            let curr_level_pull_in: Vec<&Table> = curr_level
82                .iter()
83                .flat_map(|run| run.get_contained_cmp(&key_range, cmp))
84                .collect();
85
86            let curr_level_size = curr_level_pull_in
87                .iter()
88                .map(|t| Table::file_size(t))
89                .sum::<u64>();
90
91            if curr_level_size == 0 {
92                return None;
93            }
94
95            if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
96                return None;
97            }
98
99            let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
100            let compaction_bytes = curr_level_size + next_level_size;
101
102            Some((window, curr_level_pull_in, compaction_bytes))
103        })
104        .min_by_key(|(_, _, bytes)| *bytes)
105        .map(|(window, curr_level_pull_in, _)| {
106            let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
107            ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
108            (ids, false)
109        })
110}
111
112#[doc(hidden)]
113pub const NAME: &str = "LeveledCompaction";
114
115/// Leveled compaction strategy (LCS)
116///
117/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
118///
119/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
120///
121/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
122///
123/// LCS is the recommended compaction strategy to use.
124///
125/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
126#[derive(Clone)]
127pub struct Strategy {
128    l0_threshold: u8,
129
130    /// The target table size as disk (possibly compressed).
131    target_size: u64,
132
133    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
134    level_ratio_policy: Vec<f32>,
135
136    /// When true, dynamically sizes levels based on the actual data in
137    /// the last non-empty level, reducing space amplification to ~1.1x.
138    ///
139    /// Same as `level_compaction_dynamic_level_bytes` in `RocksDB`.
140    ///
141    /// Default = false (static leveling).
142    dynamic: bool,
143
144    /// When true, enables multi-level compaction: if L0→L1 is chosen but
145    /// L1 is already oversized, compacts L0+L1→L2 directly in one pass
146    /// to avoid a write-then-rewrite cycle.
147    ///
148    /// Default = false.
149    multi_level: bool,
150}
151
152impl Default for Strategy {
153    fn default() -> Self {
154        Self {
155            l0_threshold: 4,
156            target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
157            level_ratio_policy: vec![10.0],
158            dynamic: false,
159            multi_level: false,
160        }
161    }
162}
163
164impl Strategy {
165    /// Sets the growth ratio between levels.
166    ///
167    /// Same as `set_max_bytes_for_level_multiplier` in `RocksDB`.
168    ///
169    /// Default = [10.0]
170    #[must_use]
171    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
172        self.level_ratio_policy = policy;
173        self
174    }
175
176    /// Sets the L0 threshold.
177    ///
178    /// When the number of tables in L0 reaches this threshold,
179    /// they are merged into L1.
180    ///
181    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
182    ///
183    /// Default = 4
184    #[must_use]
185    pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
186        self.l0_threshold = threshold;
187        self
188    }
189
190    /// Sets the table target size on disk (possibly compressed).
191    ///
192    /// Same as `target_file_size_base` in `RocksDB`.
193    ///
194    /// Default = 64 MiB
195    #[must_use]
196    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
197        self.target_size = bytes;
198        self
199    }
200
201    /// Enables dynamic level sizing based on actual data in the last level.
202    ///
203    /// When enabled, level target sizes are computed top-down from the actual
204    /// size of the last non-empty level, divided by the ratio at each step.
205    /// This reduces space amplification to ~1.1x while keeping write
206    /// amplification comparable to static leveling.
207    ///
208    /// Same as `level_compaction_dynamic_level_bytes` in `RocksDB`.
209    ///
210    /// Default = false
211    #[must_use]
212    pub fn with_dynamic_level_bytes(mut self, enabled: bool) -> Self {
213        self.dynamic = enabled;
214        self
215    }
216
217    /// Enables multi-level compaction optimization.
218    ///
219    /// When L0→L1 compaction is selected but L1 already exceeds its target
220    /// size, this option allows compacting L0+L1 directly into L2 in one
221    /// pass, avoiding the write-then-rewrite cycle that would otherwise
222    /// occur.
223    ///
224    /// Default = false
225    #[must_use]
226    pub fn with_multi_level(mut self, enabled: bool) -> Self {
227        self.multi_level = enabled;
228        self
229    }
230
231    /// Calculates the size of L1.
232    fn level_base_size(&self) -> u64 {
233        self.target_size * u64::from(self.l0_threshold)
234    }
235
236    /// Calculates the level target size.
237    ///
238    /// L1 = `level_base_size`
239    ///
240    /// L2 = `level_base_size * ratio`
241    ///
242    /// L3 = `level_base_size * ratio * ratio`
243    ///
244    /// ...
245    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
246        assert!(
247            canonical_level_idx >= 1,
248            "level_target_size does not apply to L0",
249        );
250
251        if canonical_level_idx == 1 {
252            // u64::from(self.target_size)
253            self.level_base_size()
254        } else {
255            #[expect(
256                clippy::cast_precision_loss,
257                reason = "precision loss is acceptable for level size calculations"
258            )]
259            let mut size = self.level_base_size() as f32;
260
261            // NOTE: Minus 2 because |{L0, L1}|
262            for idx in 0..=(canonical_level_idx - 2) {
263                let ratio = self
264                    .level_ratio_policy
265                    .get(usize::from(idx))
266                    .copied()
267                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
268
269                size *= ratio;
270            }
271
272            #[expect(
273                clippy::cast_possible_truncation,
274                clippy::cast_sign_loss,
275                reason = "size is always positive and will never even come close to u64::MAX"
276            )]
277            {
278                size as u64
279            }
280        }
281    }
282
283    /// Computes level target sizes for all 7 levels.
284    ///
285    /// In static mode, uses the standard exponential formula.
286    /// In dynamic mode, derives targets from the actual size of the last
287    /// non-empty level, dividing backwards by the ratio at each step.
288    /// Falls back to static mode when the tree is small (dynamic L1 target
289    /// would be less than `level_base_size`).
290    fn compute_level_targets(
291        &self,
292        version: &Version,
293        level_shift: usize,
294        state: &CompactionState,
295    ) -> [u64; 7] {
296        let mut targets = [u64::MAX; 7];
297
298        // L0 target is not size-based (it's count-based), so leave at MAX
299        targets[0] = u64::MAX;
300
301        if self.dynamic {
302            // Find the last non-empty level (Lmax) and its actual size.
303            // Iterate forward and keep track of the last non-empty level
304            // since iter_levels() does not support DoubleEndedIterator.
305            let mut lmax_idx = None;
306
307            for (idx, lvl) in version.iter_levels().enumerate().skip(1) {
308                if !lvl.is_empty() {
309                    lmax_idx = Some(idx);
310                }
311            }
312
313            if let Some(lmax_idx) = lmax_idx {
314                #[expect(
315                    clippy::expect_used,
316                    reason = "lmax_idx was found by iterating levels, so it must exist"
317                )]
318                let lmax_level = version.level(lmax_idx).expect("level should exist");
319
320                let lmax_size: u64 = lmax_level
321                    .iter()
322                    .flat_map(|run| run.iter())
323                    .filter(|table| !state.hidden_set().is_hidden(table.id()))
324                    .map(Table::file_size)
325                    .sum();
326
327                // Work backwards from Lmax
328                if let Some(slot) = targets.get_mut(lmax_idx) {
329                    *slot = lmax_size;
330                }
331
332                #[expect(
333                    clippy::cast_precision_loss,
334                    reason = "precision loss is acceptable for level size calculations"
335                )]
336                let mut current_target = lmax_size as f64;
337
338                // Only backfill down to the effective L1 (accounting for
339                // level_shift), not to physical level 1, so we don't
340                // overwrite slots below the shifted canonical L1.
341                let dynamic_l1_idx = level_shift + 1;
342
343                for idx in (dynamic_l1_idx..lmax_idx).rev() {
344                    let canonical = idx - level_shift;
345                    // In the forward formula, target(k+1)/target(k) = ratio[k-1],
346                    // so backwards: target(k) = target(k+1) / ratio[k-1]
347                    let ratio_idx = canonical.saturating_sub(1);
348                    let ratio = f64::from(
349                        self.level_ratio_policy
350                            .get(ratio_idx)
351                            .copied()
352                            .unwrap_or_else(|| {
353                                self.level_ratio_policy.last().copied().unwrap_or(10.0)
354                            }),
355                    );
356
357                    // Guard against invalid ratios (zero, negative, NaN, infinite).
358                    // Fall back to static targets instead of leaving partial
359                    // dynamic targets with u64::MAX in lower-level slots.
360                    if !ratio.is_finite() || ratio <= 0.0 {
361                        return self.compute_static_targets(level_shift);
362                    }
363
364                    current_target /= ratio;
365
366                    #[expect(
367                        clippy::cast_possible_truncation,
368                        clippy::cast_sign_loss,
369                        reason = "target is always positive"
370                    )]
371                    if let Some(slot) = targets.get_mut(idx) {
372                        *slot = current_target as u64;
373                    }
374                }
375
376                // Fallback: if dynamic L1 target is too small, use static.
377                // Compare the shifted L1 slot, not physical slot 1.
378                let static_l1 = self.level_base_size();
379                if targets.get(dynamic_l1_idx).copied().unwrap_or(0) < static_l1 {
380                    return self.compute_static_targets(level_shift);
381                }
382
383                return targets;
384            }
385        }
386
387        self.compute_static_targets(level_shift)
388    }
389
390    /// Computes static (exponential) level targets.
391    fn compute_static_targets(&self, level_shift: usize) -> [u64; 7] {
392        let mut targets = [u64::MAX; 7];
393
394        for (idx, slot) in targets.iter_mut().enumerate().skip(1) {
395            if idx <= level_shift {
396                continue; // stays at u64::MAX
397            }
398            #[expect(
399                clippy::cast_possible_truncation,
400                reason = "level index is bounded by level count (7)"
401            )]
402            {
403                *slot = self.level_target_size((idx - level_shift) as u8);
404            }
405        }
406
407        targets
408    }
409}
410
411impl CompactionStrategy for Strategy {
412    fn get_name(&self) -> &'static str {
413        NAME
414    }
415
416    fn get_config(&self) -> Vec<crate::KvPair> {
417        vec![
418            (
419                crate::UserKey::from("leveled_l0_threshold"),
420                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
421            ),
422            (
423                crate::UserKey::from("leveled_target_size"),
424                crate::UserValue::from(self.target_size.to_le_bytes()),
425            ),
426            (
427                crate::UserKey::from("leveled_level_ratio_policy"),
428                crate::UserValue::from({
429                    use byteorder::{LittleEndian, WriteBytesExt};
430
431                    let mut v = vec![];
432
433                    #[expect(
434                        clippy::expect_used,
435                        clippy::cast_possible_truncation,
436                        reason = "writing into Vec should not fail; policies have length of 255 max"
437                    )]
438                    v.write_u8(self.level_ratio_policy.len() as u8)
439                        .expect("cannot fail");
440
441                    for &f in &self.level_ratio_policy {
442                        #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
443                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
444                    }
445
446                    v
447                }),
448            ),
449            (
450                crate::UserKey::from("leveled_dynamic"),
451                crate::UserValue::from([u8::from(self.dynamic)]),
452            ),
453            (
454                crate::UserKey::from("leveled_multi_level"),
455                crate::UserValue::from([u8::from(self.multi_level)]),
456            ),
457        ]
458    }
459
460    #[expect(clippy::too_many_lines)]
461    fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
462        assert!(version.level_count() == 7, "should have exactly 7 levels");
463        let cmp = config.comparator.as_ref();
464
465        // Trivial move into Lmax
466        'trivial_lmax: {
467            #[expect(
468                clippy::expect_used,
469                reason = "level 0 is guaranteed to exist in a valid version"
470            )]
471            let l0 = version.level(0).expect("first level should exist");
472
473            if !l0.is_empty() && l0.is_disjoint() {
474                let lmax_index = version.level_count() - 1;
475
476                if (1..lmax_index).any(|idx| {
477                    #[expect(
478                        clippy::expect_used,
479                        reason = "levels within level_count are guaranteed to exist"
480                    )]
481                    let level = version.level(idx).expect("level should exist");
482                    !level.is_empty()
483                }) {
484                    // There are intermediary levels with data, cannot trivially move to Lmax
485                    break 'trivial_lmax;
486                }
487
488                #[expect(
489                    clippy::expect_used,
490                    reason = "lmax_index is derived from level_count so level is guaranteed to exist"
491                )]
492                let lmax = version.level(lmax_index).expect("last level should exist");
493
494                if !lmax
495                    .aggregate_key_range_cmp(cmp)
496                    .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
497                {
498                    return Choice::Move(CompactionInput {
499                        table_ids: l0.list_ids(),
500                        #[expect(
501                            clippy::cast_possible_truncation,
502                            reason = "level count is at most 7, fits in u8"
503                        )]
504                        dest_level: lmax_index as u8,
505                        canonical_level: 1,
506                        target_size: self.target_size,
507                    });
508                }
509            }
510        }
511
512        // Find the level that corresponds to L1
513        #[expect(clippy::map_unwrap_or)]
514        let first_non_empty_level = version
515            .iter_levels()
516            .enumerate()
517            .skip(1)
518            .find(|(_, lvl)| !lvl.is_empty())
519            .map(|(idx, _)| idx)
520            .unwrap_or_else(|| version.level_count() - 1);
521
522        let mut canonical_l1_idx = first_non_empty_level;
523
524        // Number of levels we have to shift to get from the actual level idx to the canonical
525        let mut level_shift = canonical_l1_idx - 1;
526
527        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
528            let need_new_l1 = version
529                .iter_levels()
530                .enumerate()
531                .skip(1)
532                .filter(|(_, lvl)| !lvl.is_empty())
533                .all(|(idx, level)| {
534                    let level_size = level
535                        .iter()
536                        .flat_map(|x| x.iter())
537                        // NOTE: Take bytes that are already being compacted into account,
538                        // otherwise we may be overcompensating
539                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
540                        .map(Table::file_size)
541                        .sum::<u64>();
542
543                    #[expect(
544                        clippy::cast_possible_truncation,
545                        reason = "level index is bounded by level count (7, technically 255)"
546                    )]
547                    let target_size = self.level_target_size((idx - level_shift) as u8);
548
549                    level_size > target_size
550                });
551
552            // Move up L1 one level if all current levels are at capacity
553            if need_new_l1 {
554                canonical_l1_idx -= 1;
555                level_shift -= 1;
556            }
557        }
558
559        // Trivial move into L1
560        'trivial: {
561            let first_level = version.l0();
562            let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
563
564            if first_level.run_count() == 1 {
565                if version.level_is_busy(0, state.hidden_set())
566                    || version.level_is_busy(target_level_idx, state.hidden_set())
567                {
568                    break 'trivial;
569                }
570
571                let Some(target_level) = &version.level(target_level_idx) else {
572                    break 'trivial;
573                };
574
575                if target_level.run_count() != 1 {
576                    break 'trivial;
577                }
578
579                let key_range = first_level.aggregate_key_range_cmp(cmp);
580
581                // Get overlapping tables in next level
582                let get_overlapping = target_level
583                    .iter()
584                    .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
585                    .map(Table::id)
586                    .next();
587
588                if get_overlapping.is_none() && first_level.is_disjoint() {
589                    #[expect(
590                        clippy::cast_possible_truncation,
591                        reason = "level index is bounded by level count (7)"
592                    )]
593                    return Choice::Move(CompactionInput {
594                        table_ids: first_level.list_ids(),
595                        dest_level: target_level_idx as u8,
596                        canonical_level: 1,
597                        target_size: self.target_size,
598                    });
599                }
600            }
601        }
602
603        // Intra-L0 compaction: merge multiple L0 runs into a single run within L0
604        // when table count is below the L0→L1 threshold
605        {
606            let first_level = version.l0();
607
608            if first_level.run_count() > 1
609                && first_level.table_count() < usize::from(self.l0_threshold)
610                && !version.level_is_busy(0, state.hidden_set())
611            {
612                return Choice::Merge(CompactionInput {
613                    table_ids: first_level.list_ids(),
614                    dest_level: 0,
615                    canonical_level: 0,
616                    target_size: self.target_size,
617                });
618            }
619        }
620
621        // Compute level targets (supports both static and dynamic modes)
622        let level_targets = self.compute_level_targets(version, level_shift, state);
623
624        // Scoring
625        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
626
627        {
628            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
629            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
630            // decisions can prioritize tables that would free the most reclaimable values.
631
632            // Score first level
633            let first_level = version.l0();
634
635            if first_level.table_count() >= usize::from(self.l0_threshold) {
636                #[expect(
637                    clippy::cast_precision_loss,
638                    reason = "precision loss is acceptable for scoring calculations"
639                )]
640                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
641                scores[0] = (ratio, 0);
642            }
643
644            // Score L1+
645            for (idx, level) in version.iter_levels().enumerate().skip(1) {
646                if level.is_empty() {
647                    continue;
648                }
649
650                let level_size = level
651                    .iter()
652                    .flat_map(|x| x.iter())
653                    // NOTE: Take bytes that are already being compacted into account,
654                    // otherwise we may be overcompensating
655                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
656                    .map(Table::file_size)
657                    .sum::<u64>();
658
659                // NOTE: We check for level length above
660                #[expect(clippy::indexing_slicing)]
661                let target_size = level_targets[idx];
662
663                #[expect(
664                    clippy::indexing_slicing,
665                    reason = "idx is from iter_levels().enumerate() so always < 7 = scores.len()"
666                )]
667                if level_size > target_size {
668                    #[expect(
669                        clippy::cast_precision_loss,
670                        reason = "precision loss is acceptable for scoring calculations"
671                    )]
672                    let score = level_size as f64 / target_size as f64;
673                    scores[idx] = (score, level_size - target_size);
674
675                    // NOTE: Force a trivial move
676                    if version
677                        .level(idx + 1)
678                        .is_some_and(|next_level| next_level.is_empty())
679                    {
680                        scores[idx] = (99.99, 999);
681                    }
682                }
683            }
684
685            // NOTE: Never score Lmax
686            {
687                scores[6] = (0.0, 0);
688            }
689        }
690
691        // Choose compaction
692        #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
693        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
694            .into_iter()
695            .enumerate()
696            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
697                score_a
698                    .partial_cmp(score_b)
699                    .unwrap_or(std::cmp::Ordering::Equal)
700            })
701            .expect("should have highest score somewhere");
702
703        if score < 1.0 {
704            return Choice::DoNothing;
705        }
706
707        // We choose L0->L1 compaction
708        if level_idx_with_highest_score == 0 {
709            let Some(first_level) = version.level(0) else {
710                return Choice::DoNothing;
711            };
712
713            if version.level_is_busy(0, state.hidden_set())
714                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
715            {
716                return Choice::DoNothing;
717            }
718
719            let Some(target_level) = &version.level(canonical_l1_idx) else {
720                return Choice::DoNothing;
721            };
722
723            let mut table_ids = first_level.list_ids();
724
725            let key_range = first_level.aggregate_key_range_cmp(cmp);
726
727            // Get overlapping tables in next level
728            let target_level_overlapping_table_ids: Vec<_> = target_level
729                .iter()
730                .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
731                .map(Table::id)
732                .collect();
733
734            table_ids.extend(&target_level_overlapping_table_ids);
735
736            // Multi-level compaction: if L1 is already oversized, skip it
737            // and compact L0+L1 directly into L2 in one pass.
738            // NOTE: Currently triggers on pre-compaction L1 score. A future
739            // improvement could use projected post-compaction bytes to also
740            // catch cases where L1 is close to its target and this batch
741            // would push it over.
742            if self.multi_level {
743                let l1_score = scores.get(canonical_l1_idx).map_or(0.0, |(s, _)| *s);
744                let l2_idx = canonical_l1_idx + 1;
745
746                if l1_score > 1.0
747                    && l2_idx < version.level_count()
748                    && !version.level_is_busy(l2_idx, state.hidden_set())
749                {
750                    if let Some(l2) = version.level(l2_idx) {
751                        // Include ALL L1 tables (we're emptying L1 into L2)
752                        table_ids.extend(target_level.list_ids());
753
754                        // Include overlapping L2 tables — query per merged
755                        // interval instead of one coarse aggregate (#72).
756                        // An aggregate across disjoint tables (e.g. [a,d] and
757                        // [x,z] → [a,z]) covers gaps and pulls in L2 tables
758                        // that don't actually overlap any input table.
759                        //
760                        // Merge input key ranges into disjoint intervals first
761                        // to reduce redundant queries when L0 tables overlap
762                        // (#122 Part 2). Sort by comparator-min, then coalesce.
763                        {
764                            let mut input_ranges: Vec<_> = target_level
765                                .iter()
766                                .chain(first_level.iter())
767                                .flat_map(|run| run.iter())
768                                .map(|t| t.key_range().clone())
769                                .collect();
770                            input_ranges.sort_by(|a, b| cmp.compare(a.min(), b.min()));
771
772                            let merged = crate::KeyRange::merge_sorted_cmp(input_ranges, cmp);
773
774                            for run in l2.iter() {
775                                for interval in &merged {
776                                    for l2t in run.get_overlapping_cmp(interval, cmp) {
777                                        table_ids.insert(Table::id(l2t));
778                                    }
779                                }
780                            }
781                        }
782
783                        #[expect(
784                            clippy::cast_possible_truncation,
785                            reason = "level index is bounded by level count (7)"
786                        )]
787                        return Choice::Merge(CompactionInput {
788                            table_ids,
789                            dest_level: l2_idx as u8,
790                            canonical_level: 2,
791                            target_size: self.target_size,
792                        });
793                    }
794                }
795            }
796
797            #[expect(
798                clippy::cast_possible_truncation,
799                reason = "level index is bounded by level count (7, technically 255)"
800            )]
801            let choice = CompactionInput {
802                table_ids,
803                dest_level: canonical_l1_idx as u8,
804                canonical_level: 1,
805                target_size: self.target_size,
806            };
807
808            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
809                return Choice::Move(choice);
810            }
811            return Choice::Merge(choice);
812        }
813
814        // We choose L1+ compaction instead
815
816        // NOTE: Level count is 255 max
817        #[expect(clippy::cast_possible_truncation)]
818        let curr_level_index = level_idx_with_highest_score as u8;
819
820        let next_level_index = curr_level_index + 1;
821
822        let Some(level) = version.level(level_idx_with_highest_score) else {
823            return Choice::DoNothing;
824        };
825
826        let Some(next_level) = version.level(next_level_index as usize) else {
827            return Choice::DoNothing;
828        };
829
830        // pick_minimal_compaction scans all runs in both levels, handling
831        // transient multi-run states from multi-level compaction (#108, #122).
832        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
833            level,
834            next_level,
835            state.hidden_set(),
836            overshoot_bytes,
837            self.target_size,
838            cmp,
839        ) else {
840            return Choice::DoNothing;
841        };
842
843        #[expect(
844            clippy::cast_possible_truncation,
845            reason = "level shift is bounded by level count (7, technically 255)"
846        )]
847        let choice = CompactionInput {
848            table_ids,
849            dest_level: next_level_index,
850            canonical_level: next_level_index - (level_shift as u8),
851            target_size: self.target_size,
852        };
853
854        if can_trivial_move && level.is_disjoint() {
855            return Choice::Move(choice);
856        }
857        Choice::Merge(choice)
858    }
859}