lsm_tree/compaction/leveled/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5#[cfg(test)]
6mod test;
7
8use super::{Choice, CompactionStrategy, Input as CompactionInput};
9use crate::{
10    compaction::state::{hidden_set::HiddenSet, CompactionState},
11    config::Config,
12    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
13    table::{util::aggregate_run_key_range, Table},
14    version::{Run, Version},
15    HashSet, TableId,
16};
17
18/// Tries to find the most optimal compaction set from one level into the other.
19fn pick_minimal_compaction(
20    curr_run: &Run<Table>,
21    next_run: Option<&Run<Table>>,
22    hidden_set: &HiddenSet,
23    _overshoot: u64,
24    table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26    // NOTE: Find largest trivial move (if it exists)
27    if let Some(window) = curr_run.shrinking_windows().find(|window| {
28        if hidden_set.is_blocked(window.iter().map(Table::id)) {
29            // IMPORTANT: Compaction is blocked because of other
30            // on-going compaction
31            return false;
32        }
33
34        let Some(next_run) = &next_run else {
35            // No run in next level, so we can trivially move
36            return true;
37        };
38
39        let key_range = aggregate_run_key_range(window);
40
41        next_run.get_overlapping(&key_range).is_empty()
42    }) {
43        let ids = window.iter().map(Table::id).collect();
44        return Some((ids, true));
45    }
46
47    // NOTE: Look for merges
48    if let Some(next_run) = &next_run {
49        next_run
50            .growing_windows()
51            .take_while(|window| {
52                // Cap at 50x tables per compaction for now
53                //
54                // At this point, all compactions are too large anyway
55                // so we can escape early
56                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57                next_level_size <= (50 * table_base_size)
58            })
59            .filter_map(|window| {
60                if hidden_set.is_blocked(window.iter().map(Table::id)) {
61                    // IMPORTANT: Compaction is blocked because of other
62                    // on-going compaction
63                    return None;
64                }
65
66                let key_range = aggregate_run_key_range(window);
67
68                // Pull in all contained tables in current level into compaction
69                let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71                let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73                // if curr_level_size < overshoot {
74                //     return None;
75                // }
76
77                if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
78                    // IMPORTANT: Compaction is blocked because of other
79                    // on-going compaction
80                    return None;
81                }
82
83                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
84
85                //  let compaction_bytes = curr_level_size + next_level_size;
86
87                #[expect(clippy::cast_precision_loss)]
88                let write_amp = (next_level_size as f32) / (curr_level_size as f32);
89
90                Some((window, curr_level_pull_in, write_amp))
91            })
92            // Find the compaction with the smallest write amplification factor
93            .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
94            .map(|(window, curr_level_pull_in, _)| {
95                let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
96                ids.extend(curr_level_pull_in.iter().map(Table::id));
97                (ids, false)
98            })
99    } else {
100        None
101    }
102}
103
104#[doc(hidden)]
105pub const NAME: &str = "LeveledCompaction";
106
107/// Leveled compaction strategy (LCS)
108///
109/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
110///
111/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
112///
113/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
114///
115/// LCS is the recommended compaction strategy to use.
116///
117/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
118#[derive(Clone)]
119pub struct Strategy {
120    l0_threshold: u8,
121
122    /// The target table size as disk (possibly compressed).
123    target_size: u64,
124
125    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
126    level_ratio_policy: Vec<f32>,
127}
128
129impl Default for Strategy {
130    fn default() -> Self {
131        Self {
132            l0_threshold: 4,
133            target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
134            level_ratio_policy: vec![10.0],
135        }
136    }
137}
138
139impl Strategy {
140    /// Sets the growth ratio between levels.
141    ///
142    /// Same as `set_max_bytes_for_level_multiplier` in `RocksDB`.
143    ///
144    /// Default = [10.0]
145    #[must_use]
146    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
147        self.level_ratio_policy = policy;
148        self
149    }
150
151    /// Sets the L0 threshold.
152    ///
153    /// When the number of tables in L0 reaches this threshold,
154    /// they are merged into L1.
155    ///
156    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
157    ///
158    /// Default = 4
159    #[must_use]
160    pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
161        self.l0_threshold = threshold;
162        self
163    }
164
165    /// Sets the table target size on disk (possibly compressed).
166    ///
167    /// Same as `target_file_size_base` in `RocksDB`.
168    ///
169    /// Default = 64 MiB
170    #[must_use]
171    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
172        self.target_size = bytes;
173        self
174    }
175
176    /// Calculates the size of L1.
177    fn level_base_size(&self) -> u64 {
178        self.target_size * u64::from(self.l0_threshold)
179    }
180
181    /// Calculates the level target size.
182    ///
183    /// L1 = `level_base_size`
184    ///
185    /// L2 = `level_base_size * ratio`
186    ///
187    /// L3 = `level_base_size * ratio * ratio`
188    ///
189    /// ...
190    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
191        assert!(
192            canonical_level_idx >= 1,
193            "level_target_size does not apply to L0",
194        );
195
196        if canonical_level_idx == 1 {
197            // u64::from(self.target_size)
198            self.level_base_size()
199        } else {
200            let mut size = self.level_base_size() as f32;
201
202            // NOTE: Minus 2 because |{L0, L1}|
203            for idx in 0..=(canonical_level_idx - 2) {
204                let ratio = self
205                    .level_ratio_policy
206                    .get(usize::from(idx))
207                    .copied()
208                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
209
210                size *= ratio;
211            }
212
213            size as u64
214        }
215    }
216}
217
218impl CompactionStrategy for Strategy {
219    fn get_name(&self) -> &'static str {
220        NAME
221    }
222
223    fn get_config(&self) -> Vec<crate::KvPair> {
224        vec![
225            (
226                crate::UserKey::from("leveled_l0_threshold"),
227                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
228            ),
229            (
230                crate::UserKey::from("leveled_target_size"),
231                crate::UserValue::from(self.target_size.to_le_bytes()),
232            ),
233            (
234                crate::UserKey::from("leveled_level_ratio_policy"),
235                crate::UserValue::from({
236                    use byteorder::{LittleEndian, WriteBytesExt};
237
238                    let mut v = vec![];
239
240                    v.write_u8(self.level_ratio_policy.len() as u8)
241                        .expect("cannot fail");
242
243                    for &f in &self.level_ratio_policy {
244                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
245                    }
246
247                    v
248                }),
249            ),
250        ]
251    }
252
253    #[expect(clippy::too_many_lines)]
254    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
255        assert!(version.level_count() == 7, "should have exactly 7 levels");
256
257        // Find the level that corresponds to L1
258        #[expect(clippy::map_unwrap_or)]
259        let mut canonical_l1_idx = version
260            .iter_levels()
261            .enumerate()
262            .skip(1)
263            .find(|(_, lvl)| !lvl.is_empty())
264            .map(|(idx, _)| idx)
265            .unwrap_or_else(|| version.level_count() - 1);
266
267        // Number of levels we have to shift to get from the actual level idx to the canonical
268        let mut level_shift = canonical_l1_idx - 1;
269
270        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
271            let need_new_l1 = version
272                .iter_levels()
273                .enumerate()
274                .skip(1)
275                .filter(|(_, lvl)| !lvl.is_empty())
276                .all(|(idx, level)| {
277                    let level_size = level
278                        .iter()
279                        .flat_map(|x| x.iter())
280                        // NOTE: Take bytes that are already being compacted into account,
281                        // otherwise we may be overcompensating
282                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
283                        .map(Table::file_size)
284                        .sum::<u64>();
285
286                    let target_size = self.level_target_size((idx - level_shift) as u8);
287
288                    level_size > target_size
289                });
290
291            // Move up L1 one level if all current levels are at capacity
292            if need_new_l1 {
293                canonical_l1_idx -= 1;
294                level_shift -= 1;
295            }
296        }
297
298        // Trivial move into L1
299        'trivial: {
300            let first_level = version.l0();
301
302            if first_level.run_count() == 1 {
303                if version.level_is_busy(0, state.hidden_set())
304                    || version.level_is_busy(canonical_l1_idx, state.hidden_set())
305                {
306                    break 'trivial;
307                }
308
309                let Some(target_level) = &version.level(canonical_l1_idx) else {
310                    break 'trivial;
311                };
312
313                if target_level.run_count() != 1 {
314                    break 'trivial;
315                }
316
317                let key_range = first_level.aggregate_key_range();
318
319                // Get overlapping tables in next level
320                let get_overlapping = target_level
321                    .iter()
322                    .flat_map(|run| run.get_overlapping(&key_range))
323                    .map(Table::id)
324                    .next();
325
326                if get_overlapping.is_none() && first_level.is_disjoint() {
327                    return Choice::Move(CompactionInput {
328                        table_ids: first_level.list_ids(),
329                        dest_level: canonical_l1_idx as u8,
330                        canonical_level: 1,
331                        target_size: self.target_size,
332                    });
333                }
334            }
335        }
336
337        // Scoring
338        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
339
340        {
341            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
342            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
343            // decisions can prioritize tables that would free the most reclaimable values.
344
345            // Score first level
346
347            let first_level = version.l0();
348
349            // TODO: use run_count instead? but be careful because of version free list GC thingy
350            if first_level.table_count() >= usize::from(self.l0_threshold) {
351                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
352                scores[0] = (ratio, 0);
353            }
354
355            // Score L1+
356            for (idx, level) in version.iter_levels().enumerate().skip(1) {
357                if level.is_empty() {
358                    continue;
359                }
360
361                let level_size = level
362                    .iter()
363                    .flat_map(|x| x.iter())
364                    // NOTE: Take bytes that are already being compacted into account,
365                    // otherwise we may be overcompensating
366                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
367                    .map(Table::file_size)
368                    .sum::<u64>();
369
370                let target_size = self.level_target_size((idx - level_shift) as u8);
371
372                // NOTE: We check for level length above
373                #[expect(clippy::indexing_slicing)]
374                if level_size > target_size {
375                    scores[idx] = (
376                        level_size as f64 / target_size as f64,
377                        level_size - target_size,
378                    );
379
380                    // NOTE: Force a trivial move
381                    if version
382                        .level(idx + 1)
383                        .is_some_and(|next_level| next_level.is_empty())
384                    {
385                        scores[idx] = (99.99, 999);
386                    }
387                }
388            }
389
390            // NOTE: Never score Lmax
391            {
392                scores[6] = (0.0, 0);
393            }
394        }
395
396        // Choose compaction
397        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
398            .into_iter()
399            .enumerate()
400            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
401                score_a
402                    .partial_cmp(score_b)
403                    .unwrap_or(std::cmp::Ordering::Equal)
404            })
405            .expect("should have highest score somewhere");
406
407        if score < 1.0 {
408            return Choice::DoNothing;
409        }
410
411        // We choose L0->L1 compaction
412        if level_idx_with_highest_score == 0 {
413            let Some(first_level) = version.level(0) else {
414                return Choice::DoNothing;
415            };
416
417            if version.level_is_busy(0, state.hidden_set())
418                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
419            {
420                return Choice::DoNothing;
421            }
422
423            let Some(target_level) = &version.level(canonical_l1_idx) else {
424                return Choice::DoNothing;
425            };
426
427            let mut table_ids = first_level.list_ids();
428
429            let key_range = first_level.aggregate_key_range();
430
431            // Get overlapping tables in next level
432            let target_level_overlapping_table_ids: Vec<_> = target_level
433                .iter()
434                .flat_map(|run| run.get_overlapping(&key_range))
435                .map(Table::id)
436                .collect();
437
438            table_ids.extend(&target_level_overlapping_table_ids);
439
440            let choice = CompactionInput {
441                table_ids,
442                dest_level: canonical_l1_idx as u8,
443                canonical_level: 1,
444                target_size: self.target_size,
445            };
446
447            /* eprintln!(
448                "merge {} tables, L0->L1: {:?}",
449                choice.segment_ids.len(),
450                choice.segment_ids,
451            ); */
452
453            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
454                return Choice::Move(choice);
455            }
456            return Choice::Merge(choice);
457        }
458
459        // We choose L1+ compaction instead
460
461        // NOTE: Level count is 255 max
462        #[expect(clippy::cast_possible_truncation)]
463        let curr_level_index = level_idx_with_highest_score as u8;
464
465        let next_level_index = curr_level_index + 1;
466
467        let Some(level) = version.level(level_idx_with_highest_score) else {
468            return Choice::DoNothing;
469        };
470
471        let Some(next_level) = version.level(next_level_index as usize) else {
472            return Choice::DoNothing;
473        };
474
475        debug_assert!(level.is_disjoint(), "level should be disjoint");
476        debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
477
478        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
479            level.first_run().expect("should have exactly one run"),
480            next_level.first_run().map(std::ops::Deref::deref),
481            state.hidden_set(),
482            overshoot_bytes,
483            self.target_size,
484        ) else {
485            return Choice::DoNothing;
486        };
487
488        let choice = CompactionInput {
489            table_ids,
490            dest_level: next_level_index,
491            canonical_level: next_level_index - (level_shift as u8),
492            target_size: self.target_size,
493        };
494
495        /* eprintln!(
496            "{} {} tables, L{}->L{next_level_index}: {:?}",
497            if can_trivial_move { "move" } else { "merge" },
498            choice.segment_ids.len(),
499            next_level_index - 1,
500            choice.segment_ids,
501        ); */
502
503        if can_trivial_move && level.is_disjoint() {
504            return Choice::Move(choice);
505        }
506        Choice::Merge(choice)
507    }
508}