Skip to main content

lsm_tree/compaction/leveled/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5#[cfg(test)]
6#[allow(
7    clippy::unwrap_used,
8    clippy::indexing_slicing,
9    clippy::useless_vec,
10    clippy::unnecessary_map_or,
11    reason = "test code"
12)]
13mod test;
14
15use super::{Choice, CompactionStrategy, Input as CompactionInput};
16use crate::{
17    HashSet, TableId,
18    compaction::state::{CompactionState, hidden_set::HiddenSet},
19    config::Config,
20    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
21    table::{Table, util::aggregate_run_key_range},
22    version::{Level, Version, run::Ranged},
23};
24#[cfg(not(feature = "std"))]
25use alloc::vec::Vec;
26
27/// Tries to find the most optimal compaction set from one level into the other.
28///
29/// Scans all runs in both levels to handle transient multi-run states from
30/// multi-level compaction (#108). See #122 Part 3.
31fn pick_minimal_compaction(
32    curr_level: &Level,
33    next_level: &Level,
34    hidden_set: &HiddenSet,
35    _overshoot: u64,
36    table_base_size: u64,
37    cmp: &dyn crate::comparator::UserComparator,
38) -> Option<(HashSet<TableId>, bool)> {
39    // NOTE: Find largest trivial move (if it exists)
40    // Check all runs in curr_level for a window that doesn't overlap ANY run
41    // in next_level.
42    for curr_run in curr_level.iter() {
43        if let Some(window) = curr_run.shrinking_windows().find(|window| {
44            if hidden_set.is_blocked(window.iter().map(Table::id)) {
45                return false;
46            }
47
48            if next_level.is_empty() {
49                return true;
50            }
51
52            let key_range = aggregate_run_key_range(window);
53
54            // Must not overlap ANY run in the next level
55            next_level
56                .iter()
57                .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
58        }) {
59            let ids = window.iter().map(Table::id).collect();
60            return Some((ids, true));
61        }
62    }
63
64    // NOTE: Look for merges
65    // Iterate windows across all runs in next_level, pull in from all runs
66    // in curr_level.
67    if next_level.is_empty() {
68        return None;
69    }
70
71    next_level
72        .iter()
73        .flat_map(|run| {
74            // Cap per-run windows at 50x table_base_size. take_while is safe
75            // here because growing_windows within a single run are monotonically
76            // increasing in size — once one exceeds the cap, all subsequent will too.
77            run.growing_windows().take_while(|window| {
78                let size = window.iter().map(Table::file_size).sum::<u64>();
79                size <= (50 * table_base_size)
80            })
81        })
82        .filter_map(|window| {
83            if hidden_set.is_blocked(window.iter().map(Table::id)) {
84                return None;
85            }
86
87            let key_range = aggregate_run_key_range(window);
88
89            // Pull in contained tables from ALL runs in curr_level
90            let curr_level_pull_in: Vec<&Table> = curr_level
91                .iter()
92                .flat_map(|run| run.get_contained_cmp(&key_range, cmp))
93                .collect();
94
95            let curr_level_size = curr_level_pull_in
96                .iter()
97                .map(|t| Table::file_size(t))
98                .sum::<u64>();
99
100            if curr_level_size == 0 {
101                return None;
102            }
103
104            if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
105                return None;
106            }
107
108            let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
109            let compaction_bytes = curr_level_size + next_level_size;
110
111            Some((window, curr_level_pull_in, compaction_bytes))
112        })
113        .min_by_key(|(_, _, bytes)| *bytes)
114        .map(|(window, curr_level_pull_in, _)| {
115            let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
116            ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
117            (ids, false)
118        })
119}
120
121#[doc(hidden)]
122pub const NAME: &str = "LeveledCompaction";
123
124/// Leveled compaction strategy (LCS)
125///
126/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
127///
128/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
129///
130/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
131///
132/// LCS is the recommended compaction strategy to use.
133///
134/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
135#[derive(Clone)]
136pub struct Strategy {
137    l0_threshold: u8,
138
139    /// The target table size as disk (possibly compressed).
140    target_size: u64,
141
142    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
143    level_ratio_policy: Vec<f32>,
144
145    /// When true, dynamically sizes levels based on the actual data in
146    /// the last non-empty level, reducing space amplification to ~1.1x.
147    ///
148    /// Same as `level_compaction_dynamic_level_bytes` in `RocksDB`.
149    ///
150    /// Default = false (static leveling).
151    dynamic: bool,
152
153    /// When true, enables multi-level compaction: if L0→L1 is chosen but
154    /// L1 is already oversized, compacts L0+L1→L2 directly in one pass
155    /// to avoid a write-then-rewrite cycle.
156    ///
157    /// Default = false.
158    multi_level: bool,
159}
160
161impl Default for Strategy {
162    fn default() -> Self {
163        Self {
164            l0_threshold: 4,
165            target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
166            level_ratio_policy: vec![10.0],
167            dynamic: false,
168            multi_level: false,
169        }
170    }
171}
172
173impl Strategy {
174    /// Sets the growth ratio between levels.
175    ///
176    /// Same as `set_max_bytes_for_level_multiplier` in `RocksDB`.
177    ///
178    /// Default = [10.0]
179    #[must_use]
180    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
181        self.level_ratio_policy = policy;
182        self
183    }
184
185    /// Sets the L0 threshold.
186    ///
187    /// When the number of tables in L0 reaches this threshold,
188    /// they are merged into L1.
189    ///
190    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
191    ///
192    /// Default = 4
193    #[must_use]
194    pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
195        self.l0_threshold = threshold;
196        self
197    }
198
199    /// Sets the table target size on disk (possibly compressed).
200    ///
201    /// Same as `target_file_size_base` in `RocksDB`.
202    ///
203    /// Default = 64 MiB
204    #[must_use]
205    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
206        self.target_size = bytes;
207        self
208    }
209
210    /// Enables dynamic level sizing based on actual data in the last level.
211    ///
212    /// When enabled, level target sizes are computed top-down from the actual
213    /// size of the last non-empty level, divided by the ratio at each step.
214    /// This reduces space amplification to ~1.1x while keeping write
215    /// amplification comparable to static leveling.
216    ///
217    /// Same as `level_compaction_dynamic_level_bytes` in `RocksDB`.
218    ///
219    /// Default = false
220    #[must_use]
221    pub fn with_dynamic_level_bytes(mut self, enabled: bool) -> Self {
222        self.dynamic = enabled;
223        self
224    }
225
226    /// Enables multi-level compaction optimization.
227    ///
228    /// When L0→L1 compaction is selected but L1 already exceeds its target
229    /// size, this option allows compacting L0+L1 directly into L2 in one
230    /// pass, avoiding the write-then-rewrite cycle that would otherwise
231    /// occur.
232    ///
233    /// Default = false
234    #[must_use]
235    pub fn with_multi_level(mut self, enabled: bool) -> Self {
236        self.multi_level = enabled;
237        self
238    }
239
240    /// Calculates the size of L1.
241    fn level_base_size(&self) -> u64 {
242        self.target_size * u64::from(self.l0_threshold)
243    }
244
245    /// Calculates the level target size.
246    ///
247    /// L1 = `level_base_size`
248    ///
249    /// L2 = `level_base_size * ratio`
250    ///
251    /// L3 = `level_base_size * ratio * ratio`
252    ///
253    /// ...
254    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
255        assert!(
256            canonical_level_idx >= 1,
257            "level_target_size does not apply to L0",
258        );
259
260        if canonical_level_idx == 1 {
261            // u64::from(self.target_size)
262            self.level_base_size()
263        } else {
264            #[expect(
265                clippy::cast_precision_loss,
266                reason = "precision loss is acceptable for level size calculations"
267            )]
268            let mut size = self.level_base_size() as f32;
269
270            // NOTE: Minus 2 because |{L0, L1}|
271            for idx in 0..=(canonical_level_idx - 2) {
272                let ratio = self
273                    .level_ratio_policy
274                    .get(usize::from(idx))
275                    .copied()
276                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
277
278                size *= ratio;
279            }
280
281            #[expect(
282                clippy::cast_possible_truncation,
283                clippy::cast_sign_loss,
284                reason = "size is always positive and will never even come close to u64::MAX"
285            )]
286            {
287                size as u64
288            }
289        }
290    }
291
292    /// Computes level target sizes for all 7 levels.
293    ///
294    /// In static mode, uses the standard exponential formula.
295    /// In dynamic mode, derives targets from the actual size of the last
296    /// non-empty level, dividing backwards by the ratio at each step.
297    /// Falls back to static mode when the tree is small (dynamic L1 target
298    /// would be less than `level_base_size`).
299    fn compute_level_targets(
300        &self,
301        version: &Version,
302        level_shift: usize,
303        state: &CompactionState,
304    ) -> [u64; 7] {
305        let mut targets = [u64::MAX; 7];
306
307        // L0 target is not size-based (it's count-based), so leave at MAX
308        targets[0] = u64::MAX;
309
310        if self.dynamic {
311            // Find the last non-empty level (Lmax) and its actual size.
312            // Iterate forward and keep track of the last non-empty level
313            // since iter_levels() does not support DoubleEndedIterator.
314            let mut lmax_idx = None;
315
316            for (idx, lvl) in version.iter_levels().enumerate().skip(1) {
317                if !lvl.is_empty() {
318                    lmax_idx = Some(idx);
319                }
320            }
321
322            if let Some(lmax_idx) = lmax_idx {
323                #[expect(
324                    clippy::expect_used,
325                    reason = "lmax_idx was found by iterating levels, so it must exist"
326                )]
327                let lmax_level = version.level(lmax_idx).expect("level should exist");
328
329                let lmax_size: u64 = lmax_level
330                    .iter()
331                    .flat_map(|run| run.iter())
332                    .filter(|table| !state.hidden_set().is_hidden(table.id()))
333                    .map(Table::file_size)
334                    .sum();
335
336                // Work backwards from Lmax
337                if let Some(slot) = targets.get_mut(lmax_idx) {
338                    *slot = lmax_size;
339                }
340
341                #[expect(
342                    clippy::cast_precision_loss,
343                    reason = "precision loss is acceptable for level size calculations"
344                )]
345                let mut current_target = lmax_size as f64;
346
347                // Only backfill down to the effective L1 (accounting for
348                // level_shift), not to physical level 1, so we don't
349                // overwrite slots below the shifted canonical L1.
350                let dynamic_l1_idx = level_shift + 1;
351
352                for idx in (dynamic_l1_idx..lmax_idx).rev() {
353                    let canonical = idx - level_shift;
354                    // In the forward formula, target(k+1)/target(k) = ratio[k-1],
355                    // so backwards: target(k) = target(k+1) / ratio[k-1]
356                    // Clamp-to-zero: the ratio for the first dynamic level reuses
357                    // index 0 rather than underflowing to a phantom level.
358                    let ratio_idx = canonical.saturating_sub(1);
359                    let ratio = f64::from(
360                        self.level_ratio_policy
361                            .get(ratio_idx)
362                            .copied()
363                            .unwrap_or_else(|| {
364                                self.level_ratio_policy.last().copied().unwrap_or(10.0)
365                            }),
366                    );
367
368                    // Guard against invalid ratios (zero, negative, NaN, infinite).
369                    // Fall back to static targets instead of leaving partial
370                    // dynamic targets with u64::MAX in lower-level slots.
371                    if !ratio.is_finite() || ratio <= 0.0 {
372                        return self.compute_static_targets(level_shift);
373                    }
374
375                    current_target /= ratio;
376
377                    #[expect(
378                        clippy::cast_possible_truncation,
379                        clippy::cast_sign_loss,
380                        reason = "target is always positive"
381                    )]
382                    if let Some(slot) = targets.get_mut(idx) {
383                        *slot = current_target as u64;
384                    }
385                }
386
387                // Fallback: if dynamic L1 target is too small, use static.
388                // Compare the shifted L1 slot, not physical slot 1.
389                let static_l1 = self.level_base_size();
390                if targets.get(dynamic_l1_idx).copied().unwrap_or(0) < static_l1 {
391                    return self.compute_static_targets(level_shift);
392                }
393
394                return targets;
395            }
396        }
397
398        self.compute_static_targets(level_shift)
399    }
400
401    /// Computes static (exponential) level targets.
402    fn compute_static_targets(&self, level_shift: usize) -> [u64; 7] {
403        let mut targets = [u64::MAX; 7];
404
405        for (idx, slot) in targets.iter_mut().enumerate().skip(1) {
406            if idx <= level_shift {
407                continue; // stays at u64::MAX
408            }
409            #[expect(
410                clippy::cast_possible_truncation,
411                reason = "level index is bounded by level count (7)"
412            )]
413            {
414                *slot = self.level_target_size((idx - level_shift) as u8);
415            }
416        }
417
418        targets
419    }
420}
421
422impl CompactionStrategy for Strategy {
423    fn get_name(&self) -> &'static str {
424        NAME
425    }
426
427    fn get_config(&self) -> Vec<crate::KvPair> {
428        vec![
429            (
430                crate::UserKey::from("leveled_l0_threshold"),
431                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
432            ),
433            (
434                crate::UserKey::from("leveled_target_size"),
435                crate::UserValue::from(self.target_size.to_le_bytes()),
436            ),
437            (
438                crate::UserKey::from("leveled_level_ratio_policy"),
439                crate::UserValue::from({
440                    use crate::io::{LittleEndian, WriteBytesExt};
441
442                    let mut v = vec![];
443
444                    #[expect(
445                        clippy::expect_used,
446                        clippy::cast_possible_truncation,
447                        reason = "writing into Vec should not fail; policies have length of 255 max"
448                    )]
449                    v.write_u8(self.level_ratio_policy.len() as u8)
450                        .expect("cannot fail");
451
452                    for &f in &self.level_ratio_policy {
453                        #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
454                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
455                    }
456
457                    v
458                }),
459            ),
460            (
461                crate::UserKey::from("leveled_dynamic"),
462                crate::UserValue::from([u8::from(self.dynamic)]),
463            ),
464            (
465                crate::UserKey::from("leveled_multi_level"),
466                crate::UserValue::from([u8::from(self.multi_level)]),
467            ),
468        ]
469    }
470
471    fn pending_compaction_bytes(&self, version: &Version) -> u64 {
472        // Read-only snapshot: an empty compaction state (nothing hidden) so the
473        // targets reflect the full current footprint; no level shift.
474        let targets = self.compute_level_targets(version, 0, &CompactionState::default());
475        let mut debt = 0u64;
476        for (idx, level) in version.iter_levels().enumerate() {
477            // Each term is at most one level's on-disk size, so the running sum is
478            // bounded by the tree's total footprint (disk capacity) and cannot
479            // overflow u64 — plain addition.
480            if idx == 0 {
481                // L0 is count-triggered: once it reaches the L0 file threshold its
482                // whole size is pending a merge into L1. Use the table (file)
483                // count, matching the `choose` trigger; `iter().count()` would
484                // count runs, undercounting a multi-table L0 run.
485                if level.table_count() >= usize::from(self.l0_threshold) {
486                    debt += level.size();
487                }
488            } else if let Some(&target) = targets.get(idx) {
489                // max(0, size - target): the bytes above this level's target are
490                // the work that must be rewritten downward.
491                debt += level.size().saturating_sub(target);
492            }
493        }
494        debt
495    }
496
497    #[expect(clippy::too_many_lines)]
498    fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
499        // Tie the invariant to the caller-supplied `Config::level_count`
500        // rather than a literal so a tree explicitly configured with a
501        // non-default level count does not panic here. The assertion still
502        // fires when a `Version` carries a different count than the
503        // `Config` driving this compactor invocation — that mismatch is a
504        // real bug (Version and Config disagree).
505        assert!(
506            version.level_count() == usize::from(config.level_count),
507            "leveled compaction requires version.level_count() == config.level_count (got version={}, config={})",
508            version.level_count(),
509            config.level_count,
510        );
511        let cmp = config.comparator.as_ref();
512
513        // Trivial move into Lmax
514        'trivial_lmax: {
515            #[expect(
516                clippy::expect_used,
517                reason = "level 0 is guaranteed to exist in a valid version"
518            )]
519            let l0 = version.level(0).expect("first level should exist");
520
521            if !l0.is_empty() && l0.is_disjoint() {
522                let lmax_index = version.level_count() - 1;
523
524                if (1..lmax_index).any(|idx| {
525                    #[expect(
526                        clippy::expect_used,
527                        reason = "levels within level_count are guaranteed to exist"
528                    )]
529                    let level = version.level(idx).expect("level should exist");
530                    !level.is_empty()
531                }) {
532                    // There are intermediary levels with data, cannot trivially move to Lmax
533                    break 'trivial_lmax;
534                }
535
536                #[expect(
537                    clippy::expect_used,
538                    reason = "lmax_index is derived from level_count so level is guaranteed to exist"
539                )]
540                let lmax = version.level(lmax_index).expect("last level should exist");
541
542                if !lmax
543                    .aggregate_key_range_cmp(cmp)
544                    .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
545                {
546                    return Choice::Move(CompactionInput {
547                        table_ids: l0.list_ids(),
548                        #[expect(
549                            clippy::cast_possible_truncation,
550                            reason = "level count is at most 7, fits in u8"
551                        )]
552                        dest_level: lmax_index as u8,
553                        canonical_level: 1,
554                        target_size: self.target_size,
555                    });
556                }
557            }
558        }
559
560        // Find the level that corresponds to L1
561        #[expect(clippy::map_unwrap_or)]
562        let first_non_empty_level = version
563            .iter_levels()
564            .enumerate()
565            .skip(1)
566            .find(|(_, lvl)| !lvl.is_empty())
567            .map(|(idx, _)| idx)
568            .unwrap_or_else(|| version.level_count() - 1);
569
570        let mut canonical_l1_idx = first_non_empty_level;
571
572        // Number of levels we have to shift to get from the actual level idx to the canonical
573        let mut level_shift = canonical_l1_idx - 1;
574
575        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
576            let need_new_l1 = version
577                .iter_levels()
578                .enumerate()
579                .skip(1)
580                .filter(|(_, lvl)| !lvl.is_empty())
581                .all(|(idx, level)| {
582                    let level_size = level
583                        .iter()
584                        .flat_map(|x| x.iter())
585                        // NOTE: Take bytes that are already being compacted into account,
586                        // otherwise we may be overcompensating
587                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
588                        .map(Table::file_size)
589                        .sum::<u64>();
590
591                    #[expect(
592                        clippy::cast_possible_truncation,
593                        reason = "level index is bounded by level count (7, technically 255)"
594                    )]
595                    let target_size = self.level_target_size((idx - level_shift) as u8);
596
597                    level_size > target_size
598                });
599
600            // Move up L1 one level if all current levels are at capacity
601            if need_new_l1 {
602                canonical_l1_idx -= 1;
603                level_shift -= 1;
604            }
605        }
606
607        // Trivial move into L1
608        'trivial: {
609            let first_level = version.l0();
610            let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
611
612            if first_level.run_count() == 1 {
613                if version.level_is_busy(0, state.hidden_set())
614                    || version.level_is_busy(target_level_idx, state.hidden_set())
615                {
616                    break 'trivial;
617                }
618
619                let Some(target_level) = &version.level(target_level_idx) else {
620                    break 'trivial;
621                };
622
623                if target_level.run_count() != 1 {
624                    break 'trivial;
625                }
626
627                let key_range = first_level.aggregate_key_range_cmp(cmp);
628
629                // Get overlapping tables in next level
630                let get_overlapping = target_level
631                    .iter()
632                    .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
633                    .map(Table::id)
634                    .next();
635
636                if get_overlapping.is_none() && first_level.is_disjoint() {
637                    #[expect(
638                        clippy::cast_possible_truncation,
639                        reason = "level index is bounded by level count (7)"
640                    )]
641                    return Choice::Move(CompactionInput {
642                        table_ids: first_level.list_ids(),
643                        dest_level: target_level_idx as u8,
644                        canonical_level: 1,
645                        target_size: self.target_size,
646                    });
647                }
648            }
649        }
650
651        // Intra-L0 compaction: merge multiple L0 runs into a single run within L0
652        // when table count is below the L0→L1 threshold
653        {
654            let first_level = version.l0();
655
656            if first_level.run_count() > 1
657                && first_level.table_count() < usize::from(self.l0_threshold)
658                && !version.level_is_busy(0, state.hidden_set())
659            {
660                return Choice::Merge(CompactionInput {
661                    table_ids: first_level.list_ids(),
662                    dest_level: 0,
663                    canonical_level: 0,
664                    target_size: self.target_size,
665                });
666            }
667        }
668
669        // Compute level targets (supports both static and dynamic modes)
670        let level_targets = self.compute_level_targets(version, level_shift, state);
671
672        // Scoring
673        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
674
675        {
676            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
677            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
678            // decisions can prioritize tables that would free the most reclaimable values.
679
680            // Score first level
681            let first_level = version.l0();
682
683            if first_level.table_count() >= usize::from(self.l0_threshold) {
684                #[expect(
685                    clippy::cast_precision_loss,
686                    reason = "precision loss is acceptable for scoring calculations"
687                )]
688                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
689                scores[0] = (ratio, 0);
690            }
691
692            // Score L1+
693            for (idx, level) in version.iter_levels().enumerate().skip(1) {
694                if level.is_empty() {
695                    continue;
696                }
697
698                let level_size = level
699                    .iter()
700                    .flat_map(|x| x.iter())
701                    // NOTE: Take bytes that are already being compacted into account,
702                    // otherwise we may be overcompensating
703                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
704                    .map(Table::file_size)
705                    .sum::<u64>();
706
707                // NOTE: We check for level length above
708                #[expect(clippy::indexing_slicing)]
709                let target_size = level_targets[idx];
710
711                #[expect(
712                    clippy::indexing_slicing,
713                    reason = "idx is from iter_levels().enumerate() so always < 7 = scores.len()"
714                )]
715                if level_size > target_size {
716                    #[expect(
717                        clippy::cast_precision_loss,
718                        reason = "precision loss is acceptable for scoring calculations"
719                    )]
720                    let score = level_size as f64 / target_size as f64;
721                    scores[idx] = (score, level_size - target_size);
722
723                    // NOTE: Force a trivial move
724                    if version
725                        .level(idx + 1)
726                        .is_some_and(|next_level| next_level.is_empty())
727                    {
728                        scores[idx] = (99.99, 999);
729                    }
730                }
731            }
732
733            // NOTE: Never score Lmax
734            {
735                scores[6] = (0.0, 0);
736            }
737        }
738
739        // Choose compaction
740        #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
741        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
742            .into_iter()
743            .enumerate()
744            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
745                score_a
746                    .partial_cmp(score_b)
747                    .unwrap_or(core::cmp::Ordering::Equal)
748            })
749            .expect("should have highest score somewhere");
750
751        if score < 1.0 {
752            return Choice::DoNothing;
753        }
754
755        // We choose L0->L1 compaction
756        if level_idx_with_highest_score == 0 {
757            let Some(first_level) = version.level(0) else {
758                return Choice::DoNothing;
759            };
760
761            if version.level_is_busy(0, state.hidden_set())
762                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
763            {
764                return Choice::DoNothing;
765            }
766
767            let Some(target_level) = &version.level(canonical_l1_idx) else {
768                return Choice::DoNothing;
769            };
770
771            let mut table_ids = first_level.list_ids();
772
773            let key_range = first_level.aggregate_key_range_cmp(cmp);
774
775            // Get overlapping tables in next level
776            let target_level_overlapping_table_ids: Vec<_> = target_level
777                .iter()
778                .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
779                .map(Table::id)
780                .collect();
781
782            table_ids.extend(&target_level_overlapping_table_ids);
783
784            // Multi-level compaction: if L1 is already oversized, skip it
785            // and compact L0+L1 directly into L2 in one pass.
786            // NOTE: Currently triggers on pre-compaction L1 score. A future
787            // improvement could use projected post-compaction bytes to also
788            // catch cases where L1 is close to its target and this batch
789            // would push it over.
790            if self.multi_level {
791                let l1_score = scores.get(canonical_l1_idx).map_or(0.0, |(s, _)| *s);
792                let l2_idx = canonical_l1_idx + 1;
793
794                if l1_score > 1.0
795                    && l2_idx < version.level_count()
796                    && !version.level_is_busy(l2_idx, state.hidden_set())
797                    && let Some(l2) = version.level(l2_idx)
798                {
799                    // Include ALL L1 tables (we're emptying L1 into L2)
800                    table_ids.extend(target_level.list_ids());
801
802                    // Include overlapping L2 tables — query per merged
803                    // interval instead of one coarse aggregate (#72).
804                    // An aggregate across disjoint tables (e.g. [a,d] and
805                    // [x,z] → [a,z]) covers gaps and pulls in L2 tables
806                    // that don't actually overlap any input table.
807                    //
808                    // Merge input key ranges into disjoint intervals first
809                    // to reduce redundant queries when L0 tables overlap
810                    // (#122 Part 2). Sort by comparator-min, then coalesce.
811                    {
812                        let mut input_ranges: Vec<_> = target_level
813                            .iter()
814                            .chain(first_level.iter())
815                            .flat_map(|run| run.iter())
816                            .map(|t| t.key_range().clone())
817                            .collect();
818                        input_ranges.sort_by(|a, b| cmp.compare(a.min(), b.min()));
819
820                        let merged = crate::KeyRange::merge_sorted_cmp(input_ranges, cmp);
821
822                        for run in l2.iter() {
823                            for interval in &merged {
824                                for l2t in run.get_overlapping_cmp(interval, cmp) {
825                                    table_ids.insert(Table::id(l2t));
826                                }
827                            }
828                        }
829                    }
830
831                    #[expect(
832                        clippy::cast_possible_truncation,
833                        reason = "level index is bounded by level count (7)"
834                    )]
835                    return Choice::Merge(CompactionInput {
836                        table_ids,
837                        dest_level: l2_idx as u8,
838                        canonical_level: 2,
839                        target_size: self.target_size,
840                    });
841                }
842            }
843
844            #[expect(
845                clippy::cast_possible_truncation,
846                reason = "level index is bounded by level count (7, technically 255)"
847            )]
848            let choice = CompactionInput {
849                table_ids,
850                dest_level: canonical_l1_idx as u8,
851                canonical_level: 1,
852                target_size: self.target_size,
853            };
854
855            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
856                return Choice::Move(choice);
857            }
858            return Choice::Merge(choice);
859        }
860
861        // We choose L1+ compaction instead
862
863        // NOTE: Level count is 255 max
864        #[expect(clippy::cast_possible_truncation)]
865        let curr_level_index = level_idx_with_highest_score as u8;
866
867        let next_level_index = curr_level_index + 1;
868
869        let Some(level) = version.level(level_idx_with_highest_score) else {
870            return Choice::DoNothing;
871        };
872
873        let Some(next_level) = version.level(next_level_index as usize) else {
874            return Choice::DoNothing;
875        };
876
877        // pick_minimal_compaction scans all runs in both levels, handling
878        // transient multi-run states from multi-level compaction (#108, #122).
879        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
880            level,
881            next_level,
882            state.hidden_set(),
883            overshoot_bytes,
884            self.target_size,
885            cmp,
886        ) else {
887            return Choice::DoNothing;
888        };
889
890        #[expect(
891            clippy::cast_possible_truncation,
892            reason = "level shift is bounded by level count (7, technically 255)"
893        )]
894        let choice = CompactionInput {
895            table_ids,
896            dest_level: next_level_index,
897            canonical_level: next_level_index - (level_shift as u8),
898            target_size: self.target_size,
899        };
900
901        if can_trivial_move && level.is_disjoint() {
902            return Choice::Move(choice);
903        }
904        Choice::Merge(choice)
905    }
906}