lsm_tree/compaction/tiered/mod.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 compaction::state::CompactionState, config::Config, table::Table, version::Version, HashSet,
8 KvPair, TableId,
9};
10
11#[cfg(test)]
12mod tests;
13
14#[doc(hidden)]
15pub const NAME: &str = "SizeTieredCompaction";
16
17/// Size-tiered compaction strategy (STCS), also known as Universal compaction.
18///
19/// All sorted runs live in L0. When enough similarly-sized runs accumulate,
20/// they are merged into a single larger run (still in L0). This minimizes
21/// write amplification at the cost of higher read and space amplification.
22///
23/// Best for write-heavy workloads: posting list merges, counters, time-series
24/// append-only data.
25///
26/// # Algorithm
27///
28/// 1. **Space amplification check:** if `total_size / largest_run_size - 1`
29/// exceeds [`max_space_amplification_percent`](Strategy::with_max_space_amplification_percent),
30/// all runs are merged (full compaction).
31/// 2. **Size-ratio merge:** runs are sorted by size (smallest first). The
32/// longest prefix where each consecutive pair satisfies
33/// `next.size / prev.size <= 1.0 + size_ratio` is selected. If the prefix
34/// length ≥ [`min_merge_width`](Strategy::with_min_merge_width), those runs
35/// are merged.
36///
37/// # Trade-offs vs Leveled
38///
39/// | Metric | STCS | Leveled |
40/// |--------|------|---------|
41/// | Write amplification | ~O(N/T) | ~O(T×L) |
42/// | Read amplification | Higher (more runs) | Lower (1 run per level) |
43/// | Space amplification | Up to 2× temporary | ~1.1× |
44#[derive(Clone)]
45pub struct Strategy {
46 /// Maximum allowed size ratio between adjacent sorted runs (by size) for
47 /// them to be considered "similar" and eligible for merging together.
48 ///
49 /// For two adjacent runs sorted by size, if `larger / smaller <= 1.0 + size_ratio`,
50 /// they are considered similar.
51 ///
52 /// Default = 1.0 (adjacent run can be up to 2× the previous).
53 size_ratio: f64,
54
55 /// Minimum number of similarly-sized sorted runs required before
56 /// triggering a merge.
57 ///
58 /// Default = 4.
59 min_merge_width: usize,
60
61 /// Maximum number of sorted runs to merge at once.
62 ///
63 /// Default = `usize::MAX` (unlimited).
64 max_merge_width: usize,
65
66 /// When space amplification exceeds this percentage, a full compaction
67 /// of all runs is triggered.
68 ///
69 /// Space amplification is computed as `(total_size / largest_run_size - 1) × 100`.
70 ///
71 /// Default = 200 (i.e. 200%, meaning total data can be up to 3× the largest run).
72 max_space_amplification_percent: u64,
73
74 /// Target table size on disk (possibly compressed) for output tables.
75 ///
76 /// Default = 64 MiB.
77 target_size: u64,
78}
79
80impl Default for Strategy {
81 fn default() -> Self {
82 Self {
83 size_ratio: 1.0,
84 min_merge_width: 4,
85 max_merge_width: usize::MAX,
86 max_space_amplification_percent: 200,
87 target_size: 64 * 1_024 * 1_024,
88 }
89 }
90}
91
92impl Strategy {
93 /// Sets the size ratio threshold for considering runs "similar".
94 ///
95 /// Two adjacent runs (sorted by size) are similar if
96 /// `larger / smaller <= 1.0 + size_ratio`.
97 ///
98 /// Same as `compaction_options_universal.size_ratio` in `RocksDB`.
99 ///
100 /// Default = 1.0
101 #[must_use]
102 pub fn with_size_ratio(mut self, ratio: f64) -> Self {
103 // Clamp invalid values: NaN, negative, and infinite are replaced
104 // with the default (1.0). Zero is allowed (exact-size-match only).
105 self.size_ratio = if ratio.is_finite() && ratio >= 0.0 {
106 ratio
107 } else {
108 1.0
109 };
110 self
111 }
112
113 /// Sets the minimum number of runs to merge at once.
114 ///
115 /// Same as `compaction_options_universal.min_merge_width` in `RocksDB`.
116 ///
117 /// Default = 4
118 #[must_use]
119 pub fn with_min_merge_width(mut self, width: usize) -> Self {
120 self.min_merge_width = width.max(2);
121 self
122 }
123
124 /// Sets the maximum number of runs to merge at once.
125 ///
126 /// Same as `compaction_options_universal.max_merge_width` in `RocksDB`.
127 ///
128 /// Default = `usize::MAX`
129 #[must_use]
130 pub fn with_max_merge_width(mut self, width: usize) -> Self {
131 self.max_merge_width = width.max(2);
132 self
133 }
134
135 /// Sets the space amplification threshold (in percent) that triggers
136 /// a full compaction of all runs.
137 ///
138 /// Same as `compaction_options_universal.max_size_amplification_percent` in `RocksDB`.
139 ///
140 /// Default = 200
141 #[must_use]
142 pub fn with_max_space_amplification_percent(mut self, percent: u64) -> Self {
143 self.max_space_amplification_percent = percent;
144 self
145 }
146
147 /// Sets the target table size on disk (possibly compressed).
148 ///
149 /// Default = 64 MiB
150 #[must_use]
151 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
152 self.target_size = bytes;
153 self
154 }
155}
156
157/// Per-run metadata for compaction decisions.
158struct RunInfo {
159 /// Total on-disk size of all tables in this run.
160 size: u64,
161
162 /// Table IDs belonging to this run.
163 table_ids: Vec<TableId>,
164}
165
166/// Collects run information from L0, filtering out runs with hidden tables.
167fn collect_available_runs(version: &Version, state: &CompactionState) -> Vec<RunInfo> {
168 let l0 = version.l0();
169
170 l0.iter()
171 .filter_map(|run| {
172 // Skip runs that have any table in the hidden set (being compacted)
173 if run
174 .iter()
175 .any(|table| state.hidden_set().is_hidden(table.id()))
176 {
177 return None;
178 }
179
180 let size = run.iter().map(Table::file_size).sum::<u64>();
181 let table_ids = run.iter().map(Table::id).collect();
182
183 Some(RunInfo { size, table_ids })
184 })
185 .collect()
186}
187
188impl CompactionStrategy for Strategy {
189 fn get_name(&self) -> &'static str {
190 NAME
191 }
192
193 fn get_config(&self) -> Vec<KvPair> {
194 use byteorder::{LittleEndian, WriteBytesExt};
195
196 let mut size_ratio_bytes = vec![];
197 #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
198 size_ratio_bytes
199 .write_f64::<LittleEndian>(self.size_ratio)
200 .expect("cannot fail");
201
202 vec![
203 (
204 crate::UserKey::from("tiered_size_ratio"),
205 crate::UserValue::from(size_ratio_bytes),
206 ),
207 (
208 crate::UserKey::from("tiered_min_merge_width"),
209 crate::UserValue::from(
210 #[expect(
211 clippy::cast_possible_truncation,
212 reason = "min_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
213 )]
214 (self.min_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
215 ),
216 ),
217 (
218 crate::UserKey::from("tiered_max_merge_width"),
219 crate::UserValue::from(
220 #[expect(
221 clippy::cast_possible_truncation,
222 reason = "max_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
223 )]
224 (self.max_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
225 ),
226 ),
227 (
228 crate::UserKey::from("tiered_max_space_amp_pct"),
229 crate::UserValue::from(self.max_space_amplification_percent.to_le_bytes()),
230 ),
231 (
232 crate::UserKey::from("tiered_target_size"),
233 crate::UserValue::from(self.target_size.to_le_bytes()),
234 ),
235 ]
236 }
237
238 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
239 let runs = collect_available_runs(version, state);
240
241 if runs.len() < 2 {
242 return Choice::DoNothing;
243 }
244
245 // --- Space amplification check ---
246 //
247 // The largest run is treated as the "base" data set. Everything else
248 // is overhead. If overhead exceeds the threshold, compact everything.
249 let total_size: u64 = runs.iter().map(|r| r.size).sum();
250 let largest_run_size = runs.iter().map(|r| r.size).max().unwrap_or(0);
251
252 if largest_run_size > 0 {
253 // Integer arithmetic to avoid f64 precision loss on large sizes.
254 // (total / largest - 1) * 100 >= threshold
255 // is equivalent to:
256 // total * 100 >= largest * (100 + threshold)
257 let lhs = u128::from(total_size).saturating_mul(100);
258 let rhs = u128::from(largest_run_size)
259 .saturating_mul(100 + u128::from(self.max_space_amplification_percent));
260
261 if lhs >= rhs {
262 let table_ids: HashSet<TableId> = runs
263 .iter()
264 .flat_map(|r| r.table_ids.iter().copied())
265 .collect();
266
267 return Choice::Merge(CompactionInput {
268 table_ids,
269 dest_level: 0,
270 canonical_level: 0,
271 target_size: self.target_size,
272 });
273 }
274 }
275
276 // --- Size-ratio triggered merge ---
277 //
278 // Sort runs by size (smallest first), then find the longest prefix
279 // where adjacent runs have similar sizes.
280 let mut sorted_runs = runs;
281 sorted_runs.sort_by(|a, b| a.size.cmp(&b.size));
282
283 let mut prefix_len = 1;
284
285 for window in sorted_runs.windows(2) {
286 // NOTE: windows(2) guarantees exactly 2 elements
287 let (Some(smaller), Some(larger)) = (window.first(), window.get(1)) else {
288 unreachable!("windows(2) always yields slices of length 2");
289 };
290
291 if smaller.size == 0 {
292 // Zero-size run: always "similar" to the next
293 prefix_len += 1;
294 continue;
295 }
296
297 #[expect(
298 clippy::cast_precision_loss,
299 reason = "precision loss is acceptable for ratio comparison"
300 )]
301 let ratio = larger.size as f64 / smaller.size as f64;
302
303 if ratio <= 1.0 + self.size_ratio {
304 prefix_len += 1;
305 } else {
306 break;
307 }
308 }
309
310 if prefix_len >= self.min_merge_width {
311 // Cap at max_merge_width, but ensure we still meet min_merge_width
312 // (guards against misconfigured max < min)
313 let merge_count = prefix_len.min(self.max_merge_width);
314
315 if merge_count >= self.min_merge_width {
316 let table_ids: HashSet<TableId> = sorted_runs
317 .iter()
318 .take(merge_count)
319 .flat_map(|r| r.table_ids.iter().copied())
320 .collect();
321
322 return Choice::Merge(CompactionInput {
323 table_ids,
324 dest_level: 0,
325 canonical_level: 0,
326 target_size: self.target_size,
327 });
328 }
329 }
330
331 Choice::DoNothing
332 }
333}