lsm_tree/compaction/tiered/mod.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 HashSet, KvPair, TableId, compaction::state::CompactionState, config::Config, table::Table,
8 version::Version,
9};
10#[cfg(not(feature = "std"))]
11use alloc::vec::Vec;
12
13#[cfg(test)]
14mod tests;
15
16#[doc(hidden)]
17pub const NAME: &str = "SizeTieredCompaction";
18
19/// Size-tiered compaction strategy (STCS), also known as Universal compaction.
20///
21/// All sorted runs live in L0. When enough similarly-sized runs accumulate,
22/// they are merged into a single larger run (still in L0). This minimizes
23/// write amplification at the cost of higher read and space amplification.
24///
25/// Best for write-heavy workloads: posting list merges, counters, time-series
26/// append-only data.
27///
28/// # Algorithm
29///
30/// 1. **Space amplification check:** if `total_size / largest_run_size - 1`
31/// exceeds [`max_space_amplification_percent`](Strategy::with_max_space_amplification_percent),
32/// all runs are merged (full compaction).
33/// 2. **Size-ratio merge:** runs are sorted by size (smallest first). The
34/// longest prefix where each consecutive pair satisfies
35/// `next.size / prev.size <= 1.0 + size_ratio` is selected. If the prefix
36/// length ≥ [`min_merge_width`](Strategy::with_min_merge_width), those runs
37/// are merged.
38///
39/// # Trade-offs vs Leveled
40///
41/// | Metric | STCS | Leveled |
42/// |--------|------|---------|
43/// | Write amplification | ~O(N/T) | ~O(T×L) |
44/// | Read amplification | Higher (more runs) | Lower (1 run per level) |
45/// | Space amplification | Up to 2× temporary | ~1.1× |
46#[derive(Clone)]
47pub struct Strategy {
48 /// Maximum allowed size ratio between adjacent sorted runs (by size) for
49 /// them to be considered "similar" and eligible for merging together.
50 ///
51 /// For two adjacent runs sorted by size, if `larger / smaller <= 1.0 + size_ratio`,
52 /// they are considered similar.
53 ///
54 /// Default = 1.0 (adjacent run can be up to 2× the previous).
55 size_ratio: f64,
56
57 /// Minimum number of similarly-sized sorted runs required before
58 /// triggering a merge.
59 ///
60 /// Default = 4.
61 min_merge_width: usize,
62
63 /// Maximum number of sorted runs to merge at once.
64 ///
65 /// Default = `usize::MAX` (unlimited).
66 max_merge_width: usize,
67
68 /// When space amplification exceeds this percentage, a full compaction
69 /// of all runs is triggered.
70 ///
71 /// Space amplification is computed as `(total_size / largest_run_size - 1) × 100`.
72 ///
73 /// Default = 200 (i.e. 200%, meaning total data can be up to 3× the largest run).
74 max_space_amplification_percent: u64,
75
76 /// Target table size on disk (possibly compressed) for output tables.
77 ///
78 /// Default = 64 MiB.
79 target_size: u64,
80}
81
82impl Default for Strategy {
83 fn default() -> Self {
84 Self {
85 size_ratio: 1.0,
86 min_merge_width: 4,
87 max_merge_width: usize::MAX,
88 max_space_amplification_percent: 200,
89 target_size: 64 * 1_024 * 1_024,
90 }
91 }
92}
93
94impl Strategy {
95 /// Sets the size ratio threshold for considering runs "similar".
96 ///
97 /// Two adjacent runs (sorted by size) are similar if
98 /// `larger / smaller <= 1.0 + size_ratio`.
99 ///
100 /// Same as `compaction_options_universal.size_ratio` in `RocksDB`.
101 ///
102 /// Default = 1.0
103 #[must_use]
104 pub fn with_size_ratio(mut self, ratio: f64) -> Self {
105 // Clamp invalid values: NaN, negative, and infinite are replaced
106 // with the default (1.0). Zero is allowed (exact-size-match only).
107 self.size_ratio = if ratio.is_finite() && ratio >= 0.0 {
108 ratio
109 } else {
110 1.0
111 };
112 self
113 }
114
115 /// Sets the minimum number of runs to merge at once.
116 ///
117 /// Same as `compaction_options_universal.min_merge_width` in `RocksDB`.
118 ///
119 /// Default = 4
120 #[must_use]
121 pub fn with_min_merge_width(mut self, width: usize) -> Self {
122 self.min_merge_width = width.max(2);
123 self
124 }
125
126 /// Sets the maximum number of runs to merge at once.
127 ///
128 /// Same as `compaction_options_universal.max_merge_width` in `RocksDB`.
129 ///
130 /// Default = `usize::MAX`
131 #[must_use]
132 pub fn with_max_merge_width(mut self, width: usize) -> Self {
133 self.max_merge_width = width.max(2);
134 self
135 }
136
137 /// Sets the space amplification threshold (in percent) that triggers
138 /// a full compaction of all runs.
139 ///
140 /// Same as `compaction_options_universal.max_size_amplification_percent` in `RocksDB`.
141 ///
142 /// Default = 200
143 #[must_use]
144 pub fn with_max_space_amplification_percent(mut self, percent: u64) -> Self {
145 self.max_space_amplification_percent = percent;
146 self
147 }
148
149 /// Sets the target table size on disk (possibly compressed).
150 ///
151 /// Default = 64 MiB
152 #[must_use]
153 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
154 self.target_size = bytes;
155 self
156 }
157}
158
159/// Per-run metadata for compaction decisions.
160struct RunInfo {
161 /// Total on-disk size of all tables in this run.
162 size: u64,
163
164 /// Table IDs belonging to this run.
165 table_ids: Vec<TableId>,
166}
167
168/// Collects run information from L0, filtering out runs with hidden tables.
169fn collect_available_runs(version: &Version, state: &CompactionState) -> Vec<RunInfo> {
170 let l0 = version.l0();
171
172 l0.iter()
173 .filter_map(|run| {
174 // Skip runs that have any table in the hidden set (being compacted)
175 if run
176 .iter()
177 .any(|table| state.hidden_set().is_hidden(table.id()))
178 {
179 return None;
180 }
181
182 let size = run.iter().map(Table::file_size).sum::<u64>();
183 let table_ids = run.iter().map(Table::id).collect();
184
185 Some(RunInfo { size, table_ids })
186 })
187 .collect()
188}
189
190impl CompactionStrategy for Strategy {
191 fn get_name(&self) -> &'static str {
192 NAME
193 }
194
195 fn get_config(&self) -> Vec<KvPair> {
196 use crate::io::{LittleEndian, WriteBytesExt};
197
198 let mut size_ratio_bytes = vec![];
199 #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
200 size_ratio_bytes
201 .write_f64::<LittleEndian>(self.size_ratio)
202 .expect("cannot fail");
203
204 vec![
205 (
206 crate::UserKey::from("tiered_size_ratio"),
207 crate::UserValue::from(size_ratio_bytes),
208 ),
209 (
210 crate::UserKey::from("tiered_min_merge_width"),
211 crate::UserValue::from(
212 #[expect(
213 clippy::cast_possible_truncation,
214 reason = "min_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
215 )]
216 (self.min_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
217 ),
218 ),
219 (
220 crate::UserKey::from("tiered_max_merge_width"),
221 crate::UserValue::from(
222 #[expect(
223 clippy::cast_possible_truncation,
224 reason = "max_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
225 )]
226 (self.max_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
227 ),
228 ),
229 (
230 crate::UserKey::from("tiered_max_space_amp_pct"),
231 crate::UserValue::from(self.max_space_amplification_percent.to_le_bytes()),
232 ),
233 (
234 crate::UserKey::from("tiered_target_size"),
235 crate::UserValue::from(self.target_size.to_le_bytes()),
236 ),
237 ]
238 }
239
240 fn pending_compaction_bytes(&self, version: &Version) -> u64 {
241 // Tiered debt mirrors the space-amplification trigger in `choose`: the
242 // bytes by which total L0 size exceeds the budget
243 // `largest_run * (1 + max_space_amplification_percent / 100)`. This makes
244 // the bytes-axis backpressure (bytes_slowdown / bytes_stop) engage for
245 // size-tiered trees, not only leveled. All runs are counted, including
246 // any already being compacted: the redundancy is on disk until the full
247 // compaction lands. Zero with fewer than two runs (nothing to reclaim).
248 let l0 = version.l0();
249 let mut total: u128 = 0;
250 let mut largest: u128 = 0;
251 let mut run_count = 0usize;
252 for run in l0.iter() {
253 let size: u128 = run.iter().map(|t| u128::from(Table::file_size(t))).sum();
254 total += size;
255 largest = largest.max(size);
256 run_count += 1;
257 }
258 if run_count < 2 || largest == 0 {
259 return 0;
260 }
261 // Same *100-scaled comparison as `choose` (avoids f64 precision loss);
262 // `rhs` saturates so an effectively unbounded threshold yields zero debt.
263 let lhs = total * 100;
264 let rhs = largest.saturating_mul(100 + u128::from(self.max_space_amplification_percent));
265 // saturating_sub: debt floors at zero by definition (none within budget).
266 let debt = lhs.saturating_sub(rhs) / 100;
267 u64::try_from(debt).unwrap_or(u64::MAX)
268 }
269
270 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
271 let runs = collect_available_runs(version, state);
272
273 if runs.len() < 2 {
274 return Choice::DoNothing;
275 }
276
277 // --- Space amplification check ---
278 //
279 // The largest run is treated as the "base" data set. Everything else
280 // is overhead. If overhead exceeds the threshold, compact everything.
281 let total_size: u64 = runs.iter().map(|r| r.size).sum();
282 let largest_run_size = runs.iter().map(|r| r.size).max().unwrap_or(0);
283
284 if largest_run_size > 0 {
285 // Integer arithmetic to avoid f64 precision loss on large sizes.
286 // (total / largest - 1) * 100 >= threshold
287 // is equivalent to:
288 // total * 100 >= largest * (100 + threshold)
289 // `lhs` multiplies a u64-derived u128 by the constant 100, which
290 // cannot overflow u128 — plain multiply. `rhs` multiplies by
291 // `100 + max_space_amplification_percent`, and the percentage is an
292 // unvalidated u64 (tests pass u64::MAX), so that product CAN exceed
293 // u128 — saturate. When `rhs` saturates to u128::MAX the `lhs >= rhs`
294 // check below cannot fire, so a huge amplification bound simply never
295 // triggers a space-amplification compaction, matching the intent of a
296 // very high threshold (tolerate maximum amplification).
297 let lhs = u128::from(total_size) * 100;
298 let rhs = u128::from(largest_run_size)
299 .saturating_mul(100 + u128::from(self.max_space_amplification_percent));
300
301 if lhs >= rhs {
302 let table_ids: HashSet<TableId> = runs
303 .iter()
304 .flat_map(|r| r.table_ids.iter().copied())
305 .collect();
306
307 return Choice::Merge(CompactionInput {
308 table_ids,
309 dest_level: 0,
310 canonical_level: 0,
311 target_size: self.target_size,
312 });
313 }
314 }
315
316 // --- Size-ratio triggered merge ---
317 //
318 // Sort runs by size (smallest first), then find the longest prefix
319 // where adjacent runs have similar sizes.
320 let mut sorted_runs = runs;
321 sorted_runs.sort_by(|a, b| a.size.cmp(&b.size));
322
323 let mut prefix_len = 1;
324
325 for window in sorted_runs.windows(2) {
326 // NOTE: windows(2) guarantees exactly 2 elements
327 let (Some(smaller), Some(larger)) = (window.first(), window.get(1)) else {
328 unreachable!("windows(2) always yields slices of length 2");
329 };
330
331 if smaller.size == 0 {
332 // Zero-size run: always "similar" to the next
333 prefix_len += 1;
334 continue;
335 }
336
337 #[expect(
338 clippy::cast_precision_loss,
339 reason = "precision loss is acceptable for ratio comparison"
340 )]
341 let ratio = larger.size as f64 / smaller.size as f64;
342
343 if ratio <= 1.0 + self.size_ratio {
344 prefix_len += 1;
345 } else {
346 break;
347 }
348 }
349
350 if prefix_len >= self.min_merge_width {
351 // Cap at max_merge_width, but ensure we still meet min_merge_width
352 // (guards against misconfigured max < min)
353 let merge_count = prefix_len.min(self.max_merge_width);
354
355 if merge_count >= self.min_merge_width {
356 let table_ids: HashSet<TableId> = sorted_runs
357 .iter()
358 .take(merge_count)
359 .flat_map(|r| r.table_ids.iter().copied())
360 .collect();
361
362 return Choice::Merge(CompactionInput {
363 table_ids,
364 dest_level: 0,
365 canonical_level: 0,
366 target_size: self.target_size,
367 });
368 }
369 }
370
371 Choice::DoNothing
372 }
373}