lsm_tree/compaction/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5//! Contains compaction strategies
6
7pub(crate) mod fifo;
8pub(crate) mod leveled;
9// pub(crate) mod maintenance;
10#[cfg(feature = "std")]
11pub(crate) mod delete_materialize;
12pub(crate) mod drop_range;
13pub mod filter;
14mod flavour;
15pub(crate) mod heal;
16pub(crate) mod major;
17pub(crate) mod movedown;
18pub(crate) mod pulldown;
19pub(crate) mod seqno_zeroer;
20pub(crate) mod state;
21pub(crate) mod stream;
22pub(crate) mod tiered;
23pub(crate) mod worker;
24
25pub use fifo::Strategy as Fifo;
26pub use filter::{CompactionFilter, Factory, ItemAccessor, Verdict};
27pub use heal::Strategy as EccHeal;
28pub use leveled::Strategy as Leveled;
29pub use tiered::Strategy as SizeTiered;
30
31pub use {
32 fifo::NAME as FIFO_COMPACTION_NAME, leveled::NAME as LEVELED_COMPACTION_NAME,
33 tiered::NAME as TIERED_COMPACTION_NAME,
34};
35
36/// Alias for `Leveled`
37pub type Levelled = Leveled;
38
39#[doc(hidden)]
40pub use movedown::Strategy as MoveDown;
41
42#[doc(hidden)]
43pub use pulldown::Strategy as PullDown;
44
45use crate::{
46 HashSet, KvPair, TableId, compaction::state::CompactionState, config::Config, version::Version,
47};
48#[cfg(not(feature = "std"))]
49use alloc::vec::Vec;
50
51/// The action taken during a compaction run.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum CompactionAction {
54 /// Strategy chose to do nothing.
55 Nothing,
56
57 /// Tables were merged (rewritten) into a destination level.
58 Merged,
59
60 /// Tables were moved to a deeper level without rewriting.
61 Moved,
62
63 /// Tables were dropped without compaction.
64 Dropped,
65}
66
67/// Result of a compaction operation, describing what happened.
68///
69/// Returned by [`crate::AbstractTree::compact`] to give callers
70/// observability into which compaction path was taken.
71#[derive(Debug, Clone, PartialEq, Eq)]
72pub struct CompactionResult {
73 /// The action that was taken.
74 pub action: CompactionAction,
75
76 /// The destination level, if applicable.
77 pub dest_level: Option<u8>,
78
79 /// Number of input tables consumed.
80 pub tables_in: usize,
81
82 /// Number of output tables produced.
83 pub tables_out: usize,
84}
85
86impl CompactionResult {
87 /// Creates a result for the "do nothing" case.
88 #[must_use]
89 pub fn nothing() -> Self {
90 Self {
91 action: CompactionAction::Nothing,
92 dest_level: None,
93 tables_in: 0,
94 tables_out: 0,
95 }
96 }
97}
98
99/// Input for compactor
100///
101/// The compaction strategy chooses which tables to compact and how.
102/// That information is given to the compactor.
103#[derive(Debug, Clone, Eq, PartialEq)]
104pub struct Input {
105 /// Tables to compact
106 pub table_ids: HashSet<TableId>,
107
108 /// Level to put the created tables into
109 pub dest_level: u8,
110
111 /// The logical level the tables are part of
112 pub canonical_level: u8,
113
114 /// Table target size
115 ///
116 /// If a table merge reaches the size threshold, a new table is started.
117 /// This results in a sorted "run" of tables.
118 pub target_size: u64,
119}
120
121/// Describes what to do (compact or not)
122#[derive(Debug, Eq, PartialEq)]
123pub enum Choice {
124 /// Just do nothing.
125 DoNothing,
126
127 /// Moves tables into another level without rewriting.
128 Move(Input),
129
130 /// Compacts some tables into a new level.
131 Merge(Input),
132
133 /// Delete tables without doing compaction.
134 ///
135 /// This may be used by a compaction strategy that wants to delete old data
136 /// without having to compact it away, like [`fifo::Strategy`].
137 Drop(HashSet<TableId>),
138}
139
140/// Trait for a compaction strategy
141///
142/// The strategy receives the levels of the LSM-tree as argument
143/// and emits a choice on what to do.
144#[expect(clippy::module_name_repetitions)]
145pub trait CompactionStrategy: Send + Sync {
146 /// Gets the compaction strategy name.
147 fn get_name(&self) -> &'static str;
148
149 #[doc(hidden)]
150 fn get_config(&self) -> Vec<KvPair> {
151 vec![]
152 }
153
154 /// Decides on what to do based on the current state of the LSM-tree's levels
155 ///
156 /// This is the purely *structural* decision (level shape, run counts, size
157 /// targets); it deliberately does not see the live runtime config. The
158 /// orchestrator layers the runtime-config-driven housekeeping fallback on
159 /// top (the density-based columnar rewrite).
160 fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice;
161
162 /// Estimated bytes pending compaction: on-disk data currently sitting above
163 /// its level's target size that must eventually be rewritten downward (a
164 /// `RocksDB` `estimate-pending-compaction-bytes` analog). A scheduler /
165 /// tiering consumer reads it as a compaction-debt signal; `0` means the tree
166 /// is at or below its target shape.
167 ///
168 /// The default is `0` for strategies without a size-target notion of debt
169 /// (FIFO, drop-range, major one-shot); the leveled strategy overrides it with
170 /// the per-level overflow sum.
171 fn pending_compaction_bytes(&self, _version: &Version) -> u64 {
172 0
173 }
174}
175
176/// Runs a strategy's structural decision and, only when it would otherwise idle,
177/// layers the runtime-config-driven density rewrite on top.
178///
179/// This keeps [`CompactionStrategy::choose`] purely structural (no runtime-config
180/// dependency) while applying the housekeeping fallback uniformly to every
181/// strategy at the orchestration layer, where the live `runtime_config` already
182/// lives. The fallback never preempts real work: it only fires when `choose`
183/// returned [`Choice::DoNothing`], and [`pick_density_rewrite`] self-gates to a
184/// no-op unless a level runs an [`Adaptive`](crate::config::DeleteStrategy::Adaptive)
185/// strategy and holds a dense-enough segment.
186pub(crate) fn choose_with_density_rewrite(
187 strategy: &dyn CompactionStrategy,
188 version: &Version,
189 config: &Config,
190 runtime_config: &crate::runtime_config::RuntimeConfig,
191 state: &CompactionState,
192) -> Choice {
193 let structural = strategy.choose(version, config, state);
194 if matches!(structural, Choice::DoNothing)
195 && let Some(input) = pick_density_rewrite(version, runtime_config, state)
196 {
197 return Choice::Merge(input);
198 }
199 structural
200}
201
202/// Picks a columnar segment whose positional delete-bitmap density has grown
203/// past its level's adaptive purge threshold, for a single-input materializing
204/// rewrite: the masked scan drops the dead rows and the fresh segment carries no
205/// bitmap, reclaiming the merge-on-read mask cost paid on every scan.
206///
207/// Returns the densest eligible candidate, or `None` when no level runs an
208/// [`Adaptive`](crate::config::DeleteStrategy::Adaptive) strategy (a fixed
209/// `CopyOnWrite` never accumulates a bitmap, a fixed `MergeOnRead` never purges)
210/// or no live segment is dense enough. Segments already enrolled in another
211/// compaction (`hidden_set`) are skipped. This self-gating is why the orchestrator
212/// can apply it uniformly after any strategy.
213///
214/// The live `runtime_config` (not the boot snapshot) supplies the per-level
215/// threshold, so an operator's `update_runtime_config` takes effect. The rewrite
216/// targets the candidate's own on-disk size, so the single survivor run stays one
217/// segment.
218pub(crate) fn pick_density_rewrite(
219 version: &Version,
220 runtime_config: &crate::runtime_config::RuntimeConfig,
221 state: &CompactionState,
222) -> Option<Input> {
223 // (density, id, level, file_size) of the densest eligible candidate.
224 let mut best: Option<(u8, TableId, usize, u64)> = None;
225 for (level_idx, level) in version.iter_levels().enumerate() {
226 let crate::config::DeleteStrategy::Adaptive {
227 purge_threshold_percent,
228 } = runtime_config.delete_strategy.get(level_idx)
229 else {
230 continue;
231 };
232 for table in level.iter().flat_map(|run| run.iter()) {
233 if state.hidden_set().is_hidden(table.id()) {
234 continue;
235 }
236 let Some(density) = table.delete_density() else {
237 continue;
238 };
239 if density < purge_threshold_percent {
240 continue;
241 }
242 let take = match best {
243 None => true,
244 Some((best_density, _, _, _)) => density > best_density,
245 };
246 if take {
247 best = Some((density, table.id(), level_idx, table.file_size()));
248 }
249 }
250 }
251
252 let (_, table_id, level_idx, file_size) = best?;
253 let mut table_ids = HashSet::default();
254 table_ids.insert(table_id);
255 #[expect(
256 clippy::cast_possible_truncation,
257 reason = "level index is bounded by level count (<= 7, technically 255)"
258 )]
259 let level = level_idx as u8;
260 Some(Input {
261 table_ids,
262 // In-place single-input rewrite: dest == source level. The masked scan
263 // materializes the survivors; plan_merge_on_read declines a
264 // bitmap-carrying source, so this runs the copy-on-write merge path.
265 dest_level: level,
266 canonical_level: level,
267 // Target the source's own size so the survivor run stays a single segment.
268 target_size: file_size,
269 })
270}