Skip to main content

lsm_tree/compaction/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5//! Contains compaction strategies
6
7pub(crate) mod fifo;
8pub(crate) mod leveled;
9// pub(crate) mod maintenance;
10#[cfg(feature = "std")]
11pub(crate) mod delete_materialize;
12pub(crate) mod drop_range;
13pub mod filter;
14mod flavour;
15pub(crate) mod heal;
16pub(crate) mod major;
17pub(crate) mod movedown;
18pub(crate) mod pulldown;
19pub(crate) mod seqno_zeroer;
20pub(crate) mod state;
21pub(crate) mod stream;
22pub(crate) mod tiered;
23pub(crate) mod worker;
24
25pub use fifo::Strategy as Fifo;
26pub use filter::{CompactionFilter, Factory, ItemAccessor, Verdict};
27pub use heal::Strategy as EccHeal;
28pub use leveled::Strategy as Leveled;
29pub use tiered::Strategy as SizeTiered;
30
31pub use {
32    fifo::NAME as FIFO_COMPACTION_NAME, leveled::NAME as LEVELED_COMPACTION_NAME,
33    tiered::NAME as TIERED_COMPACTION_NAME,
34};
35
36/// Alias for `Leveled`
37pub type Levelled = Leveled;
38
39#[doc(hidden)]
40pub use movedown::Strategy as MoveDown;
41
42#[doc(hidden)]
43pub use pulldown::Strategy as PullDown;
44
45use crate::{
46    HashSet, KvPair, TableId, compaction::state::CompactionState, config::Config, version::Version,
47};
48#[cfg(not(feature = "std"))]
49use alloc::vec::Vec;
50
51/// The action taken during a compaction run.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum CompactionAction {
54    /// Strategy chose to do nothing.
55    Nothing,
56
57    /// Tables were merged (rewritten) into a destination level.
58    Merged,
59
60    /// Tables were moved to a deeper level without rewriting.
61    Moved,
62
63    /// Tables were dropped without compaction.
64    Dropped,
65}
66
67/// Result of a compaction operation, describing what happened.
68///
69/// Returned by [`crate::AbstractTree::compact`] to give callers
70/// observability into which compaction path was taken.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct CompactionResult {
73    /// The action that was taken.
74    pub action: CompactionAction,
75
76    /// The destination level, if applicable.
77    pub dest_level: Option<u8>,
78
79    /// Number of input tables consumed.
80    pub tables_in: usize,
81
82    /// Number of output tables produced.
83    pub tables_out: usize,
84}
85
86impl CompactionResult {
87    /// Creates a result for the "do nothing" case.
88    #[must_use]
89    pub fn nothing() -> Self {
90        Self {
91            action: CompactionAction::Nothing,
92            dest_level: None,
93            tables_in: 0,
94            tables_out: 0,
95        }
96    }
97}
98
99/// Input for compactor
100///
101/// The compaction strategy chooses which tables to compact and how.
102/// That information is given to the compactor.
103#[derive(Debug, Clone, Eq, PartialEq)]
104pub struct Input {
105    /// Tables to compact
106    pub table_ids: HashSet<TableId>,
107
108    /// Level to put the created tables into
109    pub dest_level: u8,
110
111    /// The logical level the tables are part of
112    pub canonical_level: u8,
113
114    /// Table target size
115    ///
116    /// If a table merge reaches the size threshold, a new table is started.
117    /// This results in a sorted "run" of tables.
118    pub target_size: u64,
119}
120
121/// Describes what to do (compact or not)
122#[derive(Debug, Eq, PartialEq)]
123pub enum Choice {
124    /// Just do nothing.
125    DoNothing,
126
127    /// Moves tables into another level without rewriting.
128    Move(Input),
129
130    /// Compacts some tables into a new level.
131    Merge(Input),
132
133    /// Delete tables without doing compaction.
134    ///
135    /// This may be used by a compaction strategy that wants to delete old data
136    /// without having to compact it away, like [`fifo::Strategy`].
137    Drop(HashSet<TableId>),
138}
139
140/// Trait for a compaction strategy
141///
142/// The strategy receives the levels of the LSM-tree as argument
143/// and emits a choice on what to do.
144#[expect(clippy::module_name_repetitions)]
145pub trait CompactionStrategy: Send + Sync {
146    /// Gets the compaction strategy name.
147    fn get_name(&self) -> &'static str;
148
149    #[doc(hidden)]
150    fn get_config(&self) -> Vec<KvPair> {
151        vec![]
152    }
153
154    /// Decides on what to do based on the current state of the LSM-tree's levels
155    ///
156    /// This is the purely *structural* decision (level shape, run counts, size
157    /// targets); it deliberately does not see the live runtime config. The
158    /// orchestrator layers the runtime-config-driven housekeeping fallback on
159    /// top (the density-based columnar rewrite).
160    fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice;
161
162    /// Estimated bytes pending compaction: on-disk data currently sitting above
163    /// its level's target size that must eventually be rewritten downward (a
164    /// `RocksDB` `estimate-pending-compaction-bytes` analog). A scheduler /
165    /// tiering consumer reads it as a compaction-debt signal; `0` means the tree
166    /// is at or below its target shape.
167    ///
168    /// The default is `0` for strategies without a size-target notion of debt
169    /// (FIFO, drop-range, major one-shot); the leveled strategy overrides it with
170    /// the per-level overflow sum.
171    fn pending_compaction_bytes(&self, _version: &Version) -> u64 {
172        0
173    }
174}
175
176/// Runs a strategy's structural decision and, only when it would otherwise idle,
177/// layers the runtime-config-driven density rewrite on top.
178///
179/// This keeps [`CompactionStrategy::choose`] purely structural (no runtime-config
180/// dependency) while applying the housekeeping fallback uniformly to every
181/// strategy at the orchestration layer, where the live `runtime_config` already
182/// lives. The fallback never preempts real work: it only fires when `choose`
183/// returned [`Choice::DoNothing`], and [`pick_density_rewrite`] self-gates to a
184/// no-op unless a level runs an [`Adaptive`](crate::config::DeleteStrategy::Adaptive)
185/// strategy and holds a dense-enough segment.
186pub(crate) fn choose_with_density_rewrite(
187    strategy: &dyn CompactionStrategy,
188    version: &Version,
189    config: &Config,
190    runtime_config: &crate::runtime_config::RuntimeConfig,
191    state: &CompactionState,
192) -> Choice {
193    let structural = strategy.choose(version, config, state);
194    if matches!(structural, Choice::DoNothing)
195        && let Some(input) = pick_density_rewrite(version, runtime_config, state)
196    {
197        return Choice::Merge(input);
198    }
199    structural
200}
201
202/// Picks a columnar segment whose positional delete-bitmap density has grown
203/// past its level's adaptive purge threshold, for a single-input materializing
204/// rewrite: the masked scan drops the dead rows and the fresh segment carries no
205/// bitmap, reclaiming the merge-on-read mask cost paid on every scan.
206///
207/// Returns the densest eligible candidate, or `None` when no level runs an
208/// [`Adaptive`](crate::config::DeleteStrategy::Adaptive) strategy (a fixed
209/// `CopyOnWrite` never accumulates a bitmap, a fixed `MergeOnRead` never purges)
210/// or no live segment is dense enough. Segments already enrolled in another
211/// compaction (`hidden_set`) are skipped. This self-gating is why the orchestrator
212/// can apply it uniformly after any strategy.
213///
214/// The live `runtime_config` (not the boot snapshot) supplies the per-level
215/// threshold, so an operator's `update_runtime_config` takes effect. The rewrite
216/// targets the candidate's own on-disk size, so the single survivor run stays one
217/// segment.
218pub(crate) fn pick_density_rewrite(
219    version: &Version,
220    runtime_config: &crate::runtime_config::RuntimeConfig,
221    state: &CompactionState,
222) -> Option<Input> {
223    // (density, id, level, file_size) of the densest eligible candidate.
224    let mut best: Option<(u8, TableId, usize, u64)> = None;
225    for (level_idx, level) in version.iter_levels().enumerate() {
226        let crate::config::DeleteStrategy::Adaptive {
227            purge_threshold_percent,
228        } = runtime_config.delete_strategy.get(level_idx)
229        else {
230            continue;
231        };
232        for table in level.iter().flat_map(|run| run.iter()) {
233            if state.hidden_set().is_hidden(table.id()) {
234                continue;
235            }
236            let Some(density) = table.delete_density() else {
237                continue;
238            };
239            if density < purge_threshold_percent {
240                continue;
241            }
242            let take = match best {
243                None => true,
244                Some((best_density, _, _, _)) => density > best_density,
245            };
246            if take {
247                best = Some((density, table.id(), level_idx, table.file_size()));
248            }
249        }
250    }
251
252    let (_, table_id, level_idx, file_size) = best?;
253    let mut table_ids = HashSet::default();
254    table_ids.insert(table_id);
255    #[expect(
256        clippy::cast_possible_truncation,
257        reason = "level index is bounded by level count (<= 7, technically 255)"
258    )]
259    let level = level_idx as u8;
260    Some(Input {
261        table_ids,
262        // In-place single-input rewrite: dest == source level. The masked scan
263        // materializes the survivors; plan_merge_on_read declines a
264        // bitmap-carrying source, so this runs the copy-on-write merge path.
265        dest_level: level,
266        canonical_level: level,
267        // Target the source's own size so the survivor run stays a single segment.
268        target_size: file_size,
269    })
270}