Skip to main content

lsm_tree/compaction/tiered/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    HashSet, KvPair, TableId, compaction::state::CompactionState, config::Config, table::Table,
8    version::Version,
9};
10#[cfg(not(feature = "std"))]
11use alloc::vec::Vec;
12
13#[cfg(test)]
14mod tests;
15
16#[doc(hidden)]
17pub const NAME: &str = "SizeTieredCompaction";
18
19/// Size-tiered compaction strategy (STCS), also known as Universal compaction.
20///
21/// All sorted runs live in L0. When enough similarly-sized runs accumulate,
22/// they are merged into a single larger run (still in L0). This minimizes
23/// write amplification at the cost of higher read and space amplification.
24///
25/// Best for write-heavy workloads: posting list merges, counters, time-series
26/// append-only data.
27///
28/// # Algorithm
29///
30/// 1. **Space amplification check:** if `total_size / largest_run_size - 1`
31///    exceeds [`max_space_amplification_percent`](Strategy::with_max_space_amplification_percent),
32///    all runs are merged (full compaction).
33/// 2. **Size-ratio merge:** runs are sorted by size (smallest first). The
34///    longest prefix where each consecutive pair satisfies
35///    `next.size / prev.size <= 1.0 + size_ratio` is selected. If the prefix
36///    length ≥ [`min_merge_width`](Strategy::with_min_merge_width), those runs
37///    are merged.
38///
39/// # Trade-offs vs Leveled
40///
41/// | Metric | STCS | Leveled |
42/// |--------|------|---------|
43/// | Write amplification | ~O(N/T) | ~O(T×L) |
44/// | Read amplification | Higher (more runs) | Lower (1 run per level) |
45/// | Space amplification | Up to 2× temporary | ~1.1× |
46#[derive(Clone)]
47pub struct Strategy {
48    /// Maximum allowed size ratio between adjacent sorted runs (by size) for
49    /// them to be considered "similar" and eligible for merging together.
50    ///
51    /// For two adjacent runs sorted by size, if `larger / smaller <= 1.0 + size_ratio`,
52    /// they are considered similar.
53    ///
54    /// Default = 1.0 (adjacent run can be up to 2× the previous).
55    size_ratio: f64,
56
57    /// Minimum number of similarly-sized sorted runs required before
58    /// triggering a merge.
59    ///
60    /// Default = 4.
61    min_merge_width: usize,
62
63    /// Maximum number of sorted runs to merge at once.
64    ///
65    /// Default = `usize::MAX` (unlimited).
66    max_merge_width: usize,
67
68    /// When space amplification exceeds this percentage, a full compaction
69    /// of all runs is triggered.
70    ///
71    /// Space amplification is computed as `(total_size / largest_run_size - 1) × 100`.
72    ///
73    /// Default = 200 (i.e. 200%, meaning total data can be up to 3× the largest run).
74    max_space_amplification_percent: u64,
75
76    /// Target table size on disk (possibly compressed) for output tables.
77    ///
78    /// Default = 64 MiB.
79    target_size: u64,
80}
81
82impl Default for Strategy {
83    fn default() -> Self {
84        Self {
85            size_ratio: 1.0,
86            min_merge_width: 4,
87            max_merge_width: usize::MAX,
88            max_space_amplification_percent: 200,
89            target_size: 64 * 1_024 * 1_024,
90        }
91    }
92}
93
94impl Strategy {
95    /// Sets the size ratio threshold for considering runs "similar".
96    ///
97    /// Two adjacent runs (sorted by size) are similar if
98    /// `larger / smaller <= 1.0 + size_ratio`.
99    ///
100    /// Same as `compaction_options_universal.size_ratio` in `RocksDB`.
101    ///
102    /// Default = 1.0
103    #[must_use]
104    pub fn with_size_ratio(mut self, ratio: f64) -> Self {
105        // Clamp invalid values: NaN, negative, and infinite are replaced
106        // with the default (1.0). Zero is allowed (exact-size-match only).
107        self.size_ratio = if ratio.is_finite() && ratio >= 0.0 {
108            ratio
109        } else {
110            1.0
111        };
112        self
113    }
114
115    /// Sets the minimum number of runs to merge at once.
116    ///
117    /// Same as `compaction_options_universal.min_merge_width` in `RocksDB`.
118    ///
119    /// Default = 4
120    #[must_use]
121    pub fn with_min_merge_width(mut self, width: usize) -> Self {
122        self.min_merge_width = width.max(2);
123        self
124    }
125
126    /// Sets the maximum number of runs to merge at once.
127    ///
128    /// Same as `compaction_options_universal.max_merge_width` in `RocksDB`.
129    ///
130    /// Default = `usize::MAX`
131    #[must_use]
132    pub fn with_max_merge_width(mut self, width: usize) -> Self {
133        self.max_merge_width = width.max(2);
134        self
135    }
136
137    /// Sets the space amplification threshold (in percent) that triggers
138    /// a full compaction of all runs.
139    ///
140    /// Same as `compaction_options_universal.max_size_amplification_percent` in `RocksDB`.
141    ///
142    /// Default = 200
143    #[must_use]
144    pub fn with_max_space_amplification_percent(mut self, percent: u64) -> Self {
145        self.max_space_amplification_percent = percent;
146        self
147    }
148
149    /// Sets the target table size on disk (possibly compressed).
150    ///
151    /// Default = 64 MiB
152    #[must_use]
153    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
154        self.target_size = bytes;
155        self
156    }
157}
158
159/// Per-run metadata for compaction decisions.
160struct RunInfo {
161    /// Total on-disk size of all tables in this run.
162    size: u64,
163
164    /// Table IDs belonging to this run.
165    table_ids: Vec<TableId>,
166}
167
168/// Collects run information from L0, filtering out runs with hidden tables.
169fn collect_available_runs(version: &Version, state: &CompactionState) -> Vec<RunInfo> {
170    let l0 = version.l0();
171
172    l0.iter()
173        .filter_map(|run| {
174            // Skip runs that have any table in the hidden set (being compacted)
175            if run
176                .iter()
177                .any(|table| state.hidden_set().is_hidden(table.id()))
178            {
179                return None;
180            }
181
182            let size = run.iter().map(Table::file_size).sum::<u64>();
183            let table_ids = run.iter().map(Table::id).collect();
184
185            Some(RunInfo { size, table_ids })
186        })
187        .collect()
188}
189
190impl CompactionStrategy for Strategy {
191    fn get_name(&self) -> &'static str {
192        NAME
193    }
194
195    fn get_config(&self) -> Vec<KvPair> {
196        use crate::io::{LittleEndian, WriteBytesExt};
197
198        let mut size_ratio_bytes = vec![];
199        #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
200        size_ratio_bytes
201            .write_f64::<LittleEndian>(self.size_ratio)
202            .expect("cannot fail");
203
204        vec![
205            (
206                crate::UserKey::from("tiered_size_ratio"),
207                crate::UserValue::from(size_ratio_bytes),
208            ),
209            (
210                crate::UserKey::from("tiered_min_merge_width"),
211                crate::UserValue::from(
212                    #[expect(
213                        clippy::cast_possible_truncation,
214                        reason = "min_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
215                    )]
216                    (self.min_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
217                ),
218            ),
219            (
220                crate::UserKey::from("tiered_max_merge_width"),
221                crate::UserValue::from(
222                    #[expect(
223                        clippy::cast_possible_truncation,
224                        reason = "max_merge_width fits in u32 for persistence; usize::MAX maps to u32::MAX"
225                    )]
226                    (self.max_merge_width.min(u32::MAX as usize) as u32).to_le_bytes(),
227                ),
228            ),
229            (
230                crate::UserKey::from("tiered_max_space_amp_pct"),
231                crate::UserValue::from(self.max_space_amplification_percent.to_le_bytes()),
232            ),
233            (
234                crate::UserKey::from("tiered_target_size"),
235                crate::UserValue::from(self.target_size.to_le_bytes()),
236            ),
237        ]
238    }
239
240    fn pending_compaction_bytes(&self, version: &Version) -> u64 {
241        // Tiered debt mirrors the space-amplification trigger in `choose`: the
242        // bytes by which total L0 size exceeds the budget
243        // `largest_run * (1 + max_space_amplification_percent / 100)`. This makes
244        // the bytes-axis backpressure (bytes_slowdown / bytes_stop) engage for
245        // size-tiered trees, not only leveled. All runs are counted, including
246        // any already being compacted: the redundancy is on disk until the full
247        // compaction lands. Zero with fewer than two runs (nothing to reclaim).
248        let l0 = version.l0();
249        let mut total: u128 = 0;
250        let mut largest: u128 = 0;
251        let mut run_count = 0usize;
252        for run in l0.iter() {
253            let size: u128 = run.iter().map(|t| u128::from(Table::file_size(t))).sum();
254            total += size;
255            largest = largest.max(size);
256            run_count += 1;
257        }
258        if run_count < 2 || largest == 0 {
259            return 0;
260        }
261        // Same *100-scaled comparison as `choose` (avoids f64 precision loss);
262        // `rhs` saturates so an effectively unbounded threshold yields zero debt.
263        let lhs = total * 100;
264        let rhs = largest.saturating_mul(100 + u128::from(self.max_space_amplification_percent));
265        // saturating_sub: debt floors at zero by definition (none within budget).
266        let debt = lhs.saturating_sub(rhs) / 100;
267        u64::try_from(debt).unwrap_or(u64::MAX)
268    }
269
270    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
271        let runs = collect_available_runs(version, state);
272
273        if runs.len() < 2 {
274            return Choice::DoNothing;
275        }
276
277        // --- Space amplification check ---
278        //
279        // The largest run is treated as the "base" data set. Everything else
280        // is overhead. If overhead exceeds the threshold, compact everything.
281        let total_size: u64 = runs.iter().map(|r| r.size).sum();
282        let largest_run_size = runs.iter().map(|r| r.size).max().unwrap_or(0);
283
284        if largest_run_size > 0 {
285            // Integer arithmetic to avoid f64 precision loss on large sizes.
286            //   (total / largest - 1) * 100 >= threshold
287            // is equivalent to:
288            //   total * 100 >= largest * (100 + threshold)
289            // `lhs` multiplies a u64-derived u128 by the constant 100, which
290            // cannot overflow u128 — plain multiply. `rhs` multiplies by
291            // `100 + max_space_amplification_percent`, and the percentage is an
292            // unvalidated u64 (tests pass u64::MAX), so that product CAN exceed
293            // u128 — saturate. When `rhs` saturates to u128::MAX the `lhs >= rhs`
294            // check below cannot fire, so a huge amplification bound simply never
295            // triggers a space-amplification compaction, matching the intent of a
296            // very high threshold (tolerate maximum amplification).
297            let lhs = u128::from(total_size) * 100;
298            let rhs = u128::from(largest_run_size)
299                .saturating_mul(100 + u128::from(self.max_space_amplification_percent));
300
301            if lhs >= rhs {
302                let table_ids: HashSet<TableId> = runs
303                    .iter()
304                    .flat_map(|r| r.table_ids.iter().copied())
305                    .collect();
306
307                return Choice::Merge(CompactionInput {
308                    table_ids,
309                    dest_level: 0,
310                    canonical_level: 0,
311                    target_size: self.target_size,
312                });
313            }
314        }
315
316        // --- Size-ratio triggered merge ---
317        //
318        // Sort runs by size (smallest first), then find the longest prefix
319        // where adjacent runs have similar sizes.
320        let mut sorted_runs = runs;
321        sorted_runs.sort_by(|a, b| a.size.cmp(&b.size));
322
323        let mut prefix_len = 1;
324
325        for window in sorted_runs.windows(2) {
326            // NOTE: windows(2) guarantees exactly 2 elements
327            let (Some(smaller), Some(larger)) = (window.first(), window.get(1)) else {
328                unreachable!("windows(2) always yields slices of length 2");
329            };
330
331            if smaller.size == 0 {
332                // Zero-size run: always "similar" to the next
333                prefix_len += 1;
334                continue;
335            }
336
337            #[expect(
338                clippy::cast_precision_loss,
339                reason = "precision loss is acceptable for ratio comparison"
340            )]
341            let ratio = larger.size as f64 / smaller.size as f64;
342
343            if ratio <= 1.0 + self.size_ratio {
344                prefix_len += 1;
345            } else {
346                break;
347            }
348        }
349
350        if prefix_len >= self.min_merge_width {
351            // Cap at max_merge_width, but ensure we still meet min_merge_width
352            // (guards against misconfigured max < min)
353            let merge_count = prefix_len.min(self.max_merge_width);
354
355            if merge_count >= self.min_merge_width {
356                let table_ids: HashSet<TableId> = sorted_runs
357                    .iter()
358                    .take(merge_count)
359                    .flat_map(|r| r.table_ids.iter().copied())
360                    .collect();
361
362                return Choice::Merge(CompactionInput {
363                    table_ids,
364                    dest_level: 0,
365                    canonical_level: 0,
366                    target_size: self.target_size,
367                });
368            }
369        }
370
371        Choice::DoNothing
372    }
373}