Skip to main content

lsm_tree/compaction/tiered/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    compaction::state::CompactionState, config::Config, table::Table, version::Version, HashSet,
8    KvPair, TableId,
9};
10
11#[cfg(test)]
12mod tests;
13
14#[doc(hidden)]
15pub const NAME: &str = "SizeTieredCompaction";
16
17/// Size-tiered compaction strategy (STCS), also known as Universal compaction.
18///
19/// All sorted runs live in L0. When enough similarly-sized runs accumulate,
20/// they are merged into a single larger run (still in L0). This minimizes
21/// write amplification at the cost of higher read and space amplification.
22///
23/// Best for write-heavy workloads: posting list merges, counters, time-series
24/// append-only data.
25///
26/// # Algorithm
27///
28/// 1. **Space amplification check:** if `total_size / largest_run_size - 1`
29///    exceeds [`max_space_amplification_percent`](Strategy::with_max_space_amplification_percent),
30///    all runs are merged (full compaction).
31/// 2. **Size-ratio merge:** runs are sorted by size (smallest first). The
32///    longest prefix where each consecutive pair satisfies
33///    `next.size / prev.size <= 1.0 + size_ratio` is selected. If the prefix
34///    length ≥ [`min_merge_width`](Strategy::with_min_merge_width), those runs
35///    are merged.
36///
37/// # Trade-offs vs Leveled
38///
39/// | Metric | STCS | Leveled |
40/// |--------|------|---------|
41/// | Write amplification | ~O(N/T) | ~O(T×L) |
42/// | Read amplification | Higher (more runs) | Lower (1 run per level) |
43/// | Space amplification | Up to 2× temporary | ~1.1× |
44#[derive(Clone)]
45pub struct Strategy {
46    /// Maximum allowed size ratio between adjacent sorted runs (by size) for
47    /// them to be considered "similar" and eligible for merging together.
48    ///
49    /// For two adjacent runs sorted by size, if `larger / smaller <= 1.0 + size_ratio`,
50    /// they are considered similar.
51    ///
52    /// Default = 1.0 (adjacent run can be up to 2× the previous).
53    size_ratio: f64,
54
55    /// Minimum number of similarly-sized sorted runs required before
56    /// triggering a merge.
57    ///
58    /// Default = 4.
59    min_merge_width: usize,
60
61    /// Maximum number of sorted runs to merge at once.
62    ///
63    /// Default = `usize::MAX` (unlimited).
64    max_merge_width: usize,
65
66    /// When space amplification exceeds this percentage, a full compaction
67    /// of all runs is triggered.
68    ///
69    /// Space amplification is computed as `(total_size / largest_run_size - 1) × 100`.
70    ///
71    /// Default = 200 (i.e. 200%, meaning total data can be up to 3× the largest run).
72    max_space_amplification_percent: u64,
73
74    /// Target table size on disk (possibly compressed) for output tables.
75    ///
76    /// Default = 64 MiB.
77    target_size: u64,
78}
79
80impl Default for Strategy {
81    fn default() -> Self {
82        Self {
83            size_ratio: 1.0,
84            min_merge_width: 4,
85            max_merge_width: usize::MAX,
86            max_space_amplification_percent: 200,
87            target_size: 64 * 1_024 * 1_024,
88        }
89    }
90}
91
92impl Strategy {
93    /// Sets the size ratio threshold for considering runs "similar".
94    ///
95    /// Two adjacent runs (sorted by size) are similar if
96    /// `larger / smaller <= 1.0 + size_ratio`.
97    ///
98    /// Same as `compaction_options_universal.size_ratio` in `RocksDB`.
99    ///
100    /// Default = 1.0
101    #[must_use]
102    pub fn with_size_ratio(mut self, ratio: f64) -> Self {
103        // Clamp invalid values: NaN, negative, and infinite are replaced
104        // with the default (1.0). Zero is allowed (exact-size-match only).
105        self.size_ratio = if ratio.is_finite() && ratio >= 0.0 {
106            ratio
107        } else {
108            1.0
109        };
110        self
111    }
112
113    /// Sets the minimum number of runs to merge at once.
114    ///
115    /// Same as `compaction_options_universal.min_merge_width` in `RocksDB`.
116    ///
117    /// Default = 4
118    #[must_use]
119    pub fn with_min_merge_width(mut self, width: usize) -> Self {
120        self.min_merge_width = width.max(2);
121        self
122    }
123
124    /// Sets the maximum number of runs to merge at once.
125    ///
126    /// Same as `compaction_options_universal.max_merge_width` in `RocksDB`.
127    ///
128    /// Default = `usize::MAX`
129    #[must_use]
130    pub fn with_max_merge_width(mut self, width: usize) -> Self {
131        self.max_merge_width = width.max(2);
132        self
133    }
134
135    /// Sets the space amplification threshold (in percent) that triggers
136    /// a full compaction of all runs.
137    ///
138    /// Same as `compaction_options_universal.max_size_amplification_percent` in `RocksDB`.
139    ///
140    /// Default = 200
141    #[must_use]
142    pub fn with_max_space_amplification_percent(mut self, percent: u64) -> Self {
143        self.max_space_amplification_percent = percent;
144        self
145    }
146
147    /// Sets the target table size on disk (possibly compressed).
148    ///
149    /// Default = 64 MiB
150    #[must_use]
151    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
152        self.target_size = bytes;
153        self
154    }
155}
156
157/// Per-run metadata for compaction decisions.
158struct RunInfo {
159    /// Total on-disk size of all tables in this run.
160    size: u64,
161
162    /// Table IDs belonging to this run.
163    table_ids: Vec<TableId>,
164}
165
166/// Collects run information from L0, filtering out runs with hidden tables.
167fn collect_available_runs(version: &Version, state: &CompactionState) -> Vec<RunInfo> {
168    let l0 = version.l0();
169
170    l0.iter()
171        .filter_map(|run| {
172            // Skip runs that have any table in the hidden set (being compacted)
173            if run
174                .iter()
175                .any(|table| state.hidden_set().is_hidden(table.id()))
176            {
177                return None;
178            }
179
180            let size = run.iter().map(Table::file_size).sum::<u64>();
181            let table_ids = run.iter().map(Table::id).collect();
182
183            Some(RunInfo { size, table_ids })
184        })
185        .collect()
186}
187
188impl CompactionStrategy for Strategy {
189    fn get_name(&self) -> &'static str {
190        NAME
191    }
192
193    fn get_config(&self) -> Vec<KvPair> {
194        use byteorder::{LittleEndian, WriteBytesExt};
195
196        let mut size_ratio_bytes = vec![];
197        #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
198        size_ratio_bytes
199            .write_f64::<LittleEndian>(self.size_ratio)
200            .expect("cannot fail");
201
202        vec![
203            (
204                crate::UserKey::from("tiered_size_ratio"),
205                crate::UserValue::from(size_ratio_bytes),
206            ),
207            (
208                crate::UserKey::from("tiered_min_merge_width"),
209                crate::UserValue::from(
210                    #[expect(
211                        clippy::cast_possible_truncation,
212                        reason = "min_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
213                    )]
214                    (self.min_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
215                ),
216            ),
217            (
218                crate::UserKey::from("tiered_max_merge_width"),
219                crate::UserValue::from(
220                    #[expect(
221                        clippy::cast_possible_truncation,
222                        reason = "max_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
223                    )]
224                    (self.max_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
225                ),
226            ),
227            (
228                crate::UserKey::from("tiered_max_space_amp_pct"),
229                crate::UserValue::from(self.max_space_amplification_percent.to_le_bytes()),
230            ),
231            (
232                crate::UserKey::from("tiered_target_size"),
233                crate::UserValue::from(self.target_size.to_le_bytes()),
234            ),
235        ]
236    }
237
238    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
239        let runs = collect_available_runs(version, state);
240
241        if runs.len() < 2 {
242            return Choice::DoNothing;
243        }
244
245        // --- Space amplification check ---
246        //
247        // The largest run is treated as the "base" data set. Everything else
248        // is overhead. If overhead exceeds the threshold, compact everything.
249        let total_size: u64 = runs.iter().map(|r| r.size).sum();
250        let largest_run_size = runs.iter().map(|r| r.size).max().unwrap_or(0);
251
252        if largest_run_size > 0 {
253            // Integer arithmetic to avoid f64 precision loss on large sizes.
254            //   (total / largest - 1) * 100 >= threshold
255            // is equivalent to:
256            //   total * 100 >= largest * (100 + threshold)
257            let lhs = u128::from(total_size).saturating_mul(100);
258            let rhs = u128::from(largest_run_size)
259                .saturating_mul(100 + u128::from(self.max_space_amplification_percent));
260
261            if lhs >= rhs {
262                let table_ids: HashSet<TableId> = runs
263                    .iter()
264                    .flat_map(|r| r.table_ids.iter().copied())
265                    .collect();
266
267                return Choice::Merge(CompactionInput {
268                    table_ids,
269                    dest_level: 0,
270                    canonical_level: 0,
271                    target_size: self.target_size,
272                });
273            }
274        }
275
276        // --- Size-ratio triggered merge ---
277        //
278        // Sort runs by size (smallest first), then find the longest prefix
279        // where adjacent runs have similar sizes.
280        let mut sorted_runs = runs;
281        sorted_runs.sort_by(|a, b| a.size.cmp(&b.size));
282
283        let mut prefix_len = 1;
284
285        for window in sorted_runs.windows(2) {
286            // NOTE: windows(2) guarantees exactly 2 elements
287            let (Some(smaller), Some(larger)) = (window.first(), window.get(1)) else {
288                unreachable!("windows(2) always yields slices of length 2");
289            };
290
291            if smaller.size == 0 {
292                // Zero-size run: always "similar" to the next
293                prefix_len += 1;
294                continue;
295            }
296
297            #[expect(
298                clippy::cast_precision_loss,
299                reason = "precision loss is acceptable for ratio comparison"
300            )]
301            let ratio = larger.size as f64 / smaller.size as f64;
302
303            if ratio <= 1.0 + self.size_ratio {
304                prefix_len += 1;
305            } else {
306                break;
307            }
308        }
309
310        if prefix_len >= self.min_merge_width {
311            // Cap at max_merge_width, but ensure we still meet min_merge_width
312            // (guards against misconfigured max < min)
313            let merge_count = prefix_len.min(self.max_merge_width);
314
315            if merge_count >= self.min_merge_width {
316                let table_ids: HashSet<TableId> = sorted_runs
317                    .iter()
318                    .take(merge_count)
319                    .flat_map(|r| r.table_ids.iter().copied())
320                    .collect();
321
322                return Choice::Merge(CompactionInput {
323                    table_ids,
324                    dest_level: 0,
325                    canonical_level: 0,
326                    target_size: self.target_size,
327                });
328            }
329        }
330
331        Choice::DoNothing
332    }
333}