lsm_tree/compaction/leveled/
mod.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5#[cfg(test)]
6mod test;
7
8use super::{Choice, CompactionStrategy, Input as CompactionInput};
9use crate::{
10    compaction::state::{hidden_set::HiddenSet, CompactionState},
11    config::Config,
12    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
13    table::{util::aggregate_run_key_range, Table},
14    version::{Run, Version},
15    HashSet, TableId,
16};
17
18/// Tries to find the most optimal compaction set from one level into the other.
19fn pick_minimal_compaction(
20    curr_run: &Run<Table>,
21    next_run: Option<&Run<Table>>,
22    hidden_set: &HiddenSet,
23    overshoot: u64,
24    table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26    // NOTE: Find largest trivial move (if it exists)
27    if let Some(window) = curr_run.shrinking_windows().find(|window| {
28        if hidden_set.is_blocked(window.iter().map(Table::id)) {
29            // IMPORTANT: Compaction is blocked because of other
30            // on-going compaction
31            return false;
32        }
33
34        let Some(next_run) = &next_run else {
35            // No run in next level, so we can trivially move
36            return true;
37        };
38
39        let key_range = aggregate_run_key_range(window);
40
41        next_run.get_overlapping(&key_range).is_empty()
42    }) {
43        let ids = window.iter().map(Table::id).collect();
44        return Some((ids, true));
45    }
46
47    // NOTE: Look for merges
48    if let Some(next_run) = &next_run {
49        next_run
50            .growing_windows()
51            .take_while(|window| {
52                // Cap at 50x tables per compaction for now
53                //
54                // At this point, all compactions are too large anyway
55                // so we can escape early
56                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57                next_level_size <= (50 * table_base_size)
58            })
59            .filter_map(|window| {
60                if hidden_set.is_blocked(window.iter().map(Table::id)) {
61                    // IMPORTANT: Compaction is blocked because of other
62                    // on-going compaction
63                    return None;
64                }
65
66                let key_range = aggregate_run_key_range(window);
67
68                // Pull in all contained tables in current level into compaction
69                let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71                let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73                if curr_level_size == 0 {
74                    return None;
75                }
76
77                // TODO: toggling this statement can deadlock compactions because if there are only larger-than-overshoot
78                //  compactions, they would not be chosen
79                // if curr_level_size < overshoot {
80                //     return None;
81                // }
82
83                if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
84                    // IMPORTANT: Compaction is blocked because of other
85                    // on-going compaction
86                    return None;
87                }
88
89                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
90
91                let compaction_bytes = curr_level_size + next_level_size;
92
93                #[expect(clippy::cast_precision_loss)]
94                let write_amp = (next_level_size as f32) / (curr_level_size as f32);
95
96                Some((window, curr_level_pull_in, write_amp, compaction_bytes))
97            })
98            // Find the compaction with the smallest write set
99            .min_by_key(|(_, _, _waf, bytes)| *bytes)
100            .map(|(window, curr_level_pull_in, _, _)| {
101                let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
102                ids.extend(curr_level_pull_in.iter().map(Table::id));
103                (ids, false)
104            })
105    } else {
106        None
107    }
108}
109
110#[doc(hidden)]
111pub const NAME: &str = "LeveledCompaction";
112
113/// Leveled compaction strategy (LCS)
114///
115/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
116///
117/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
118///
119/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
120///
121/// LCS is the recommended compaction strategy to use.
122///
123/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
124#[derive(Clone)]
125pub struct Strategy {
126    l0_threshold: u8,
127
128    /// The target table size as disk (possibly compressed).
129    target_size: u64,
130
131    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
132    level_ratio_policy: Vec<f32>,
133}
134
135impl Default for Strategy {
136    fn default() -> Self {
137        Self {
138            l0_threshold: 4,
139            target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
140            level_ratio_policy: vec![10.0],
141        }
142    }
143}
144
145impl Strategy {
146    /// Sets the growth ratio between levels.
147    ///
148    /// Same as `set_max_bytes_for_level_multiplier` in `RocksDB`.
149    ///
150    /// Default = [10.0]
151    #[must_use]
152    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
153        self.level_ratio_policy = policy;
154        self
155    }
156
157    /// Sets the L0 threshold.
158    ///
159    /// When the number of tables in L0 reaches this threshold,
160    /// they are merged into L1.
161    ///
162    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
163    ///
164    /// Default = 4
165    #[must_use]
166    pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
167        self.l0_threshold = threshold;
168        self
169    }
170
171    /// Sets the table target size on disk (possibly compressed).
172    ///
173    /// Same as `target_file_size_base` in `RocksDB`.
174    ///
175    /// Default = 64 MiB
176    #[must_use]
177    pub fn with_table_target_size(mut self, bytes: u64) -> Self {
178        self.target_size = bytes;
179        self
180    }
181
182    /// Calculates the size of L1.
183    fn level_base_size(&self) -> u64 {
184        self.target_size * u64::from(self.l0_threshold)
185    }
186
187    /// Calculates the level target size.
188    ///
189    /// L1 = `level_base_size`
190    ///
191    /// L2 = `level_base_size * ratio`
192    ///
193    /// L3 = `level_base_size * ratio * ratio`
194    ///
195    /// ...
196    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
197        assert!(
198            canonical_level_idx >= 1,
199            "level_target_size does not apply to L0",
200        );
201
202        if canonical_level_idx == 1 {
203            // u64::from(self.target_size)
204            self.level_base_size()
205        } else {
206            #[expect(
207                clippy::cast_precision_loss,
208                reason = "precision loss is acceptable for level size calculations"
209            )]
210            let mut size = self.level_base_size() as f32;
211
212            // NOTE: Minus 2 because |{L0, L1}|
213            for idx in 0..=(canonical_level_idx - 2) {
214                let ratio = self
215                    .level_ratio_policy
216                    .get(usize::from(idx))
217                    .copied()
218                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
219
220                size *= ratio;
221            }
222
223            #[expect(
224                clippy::cast_possible_truncation,
225                clippy::cast_sign_loss,
226                reason = "size is always positive and will never even come close to u64::MAX"
227            )]
228            {
229                size as u64
230            }
231        }
232    }
233}
234
235impl CompactionStrategy for Strategy {
236    fn get_name(&self) -> &'static str {
237        NAME
238    }
239
240    fn get_config(&self) -> Vec<crate::KvPair> {
241        vec![
242            (
243                crate::UserKey::from("leveled_l0_threshold"),
244                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
245            ),
246            (
247                crate::UserKey::from("leveled_target_size"),
248                crate::UserValue::from(self.target_size.to_le_bytes()),
249            ),
250            (
251                crate::UserKey::from("leveled_level_ratio_policy"),
252                crate::UserValue::from({
253                    use byteorder::{LittleEndian, WriteBytesExt};
254
255                    let mut v = vec![];
256
257                    #[expect(
258                        clippy::expect_used,
259                        clippy::cast_possible_truncation,
260                        reason = "writing into Vec should not fail; policies have length of 255 max"
261                    )]
262                    v.write_u8(self.level_ratio_policy.len() as u8)
263                        .expect("cannot fail");
264
265                    for &f in &self.level_ratio_policy {
266                        #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
267                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
268                    }
269
270                    v
271                }),
272            ),
273        ]
274    }
275
276    #[expect(clippy::too_many_lines)]
277    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
278        assert!(version.level_count() == 7, "should have exactly 7 levels");
279
280        // Trivial move into Lmax
281        'trivial_lmax: {
282            let l0 = version.level(0).expect("first level should exist");
283
284            if !l0.is_empty() && l0.is_disjoint() {
285                let lmax_index = version.level_count() - 1;
286
287                if (1..lmax_index)
288                    .any(|idx| !version.level(idx).expect("level should exist").is_empty())
289                {
290                    // There are intermediary levels with data, cannot trivially move to Lmax
291                    break 'trivial_lmax;
292                }
293
294                let lmax = version.level(lmax_index).expect("last level should exist");
295
296                if !lmax
297                    .aggregate_key_range()
298                    .overlaps_with_key_range(&l0.aggregate_key_range())
299                {
300                    return Choice::Move(CompactionInput {
301                        table_ids: l0.list_ids(),
302                        dest_level: lmax_index as u8,
303                        canonical_level: 1,
304                        target_size: self.target_size,
305                    });
306                }
307            }
308        }
309
310        // Find the level that corresponds to L1
311        #[expect(clippy::map_unwrap_or)]
312        let first_non_empty_level = version
313            .iter_levels()
314            .enumerate()
315            .skip(1)
316            .find(|(_, lvl)| !lvl.is_empty())
317            .map(|(idx, _)| idx)
318            .unwrap_or_else(|| version.level_count() - 1);
319
320        let mut canonical_l1_idx = first_non_empty_level;
321
322        // Number of levels we have to shift to get from the actual level idx to the canonical
323        let mut level_shift = canonical_l1_idx - 1;
324
325        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
326            let need_new_l1 = version
327                .iter_levels()
328                .enumerate()
329                .skip(1)
330                .filter(|(_, lvl)| !lvl.is_empty())
331                .all(|(idx, level)| {
332                    let level_size = level
333                        .iter()
334                        .flat_map(|x| x.iter())
335                        // NOTE: Take bytes that are already being compacted into account,
336                        // otherwise we may be overcompensating
337                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
338                        .map(Table::file_size)
339                        .sum::<u64>();
340
341                    #[expect(
342                        clippy::cast_possible_truncation,
343                        reason = "level index is bounded by level count (7, technically 255)"
344                    )]
345                    let target_size = self.level_target_size((idx - level_shift) as u8);
346
347                    level_size > target_size
348                });
349
350            // Move up L1 one level if all current levels are at capacity
351            if need_new_l1 {
352                canonical_l1_idx -= 1;
353                level_shift -= 1;
354            }
355        }
356
357        // Trivial move into L1
358        'trivial: {
359            let first_level = version.l0();
360            let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
361
362            if first_level.run_count() == 1 {
363                if version.level_is_busy(0, state.hidden_set())
364                    || version.level_is_busy(target_level_idx, state.hidden_set())
365                {
366                    break 'trivial;
367                }
368
369                let Some(target_level) = &version.level(target_level_idx) else {
370                    break 'trivial;
371                };
372
373                if target_level.run_count() != 1 {
374                    break 'trivial;
375                }
376
377                let key_range = first_level.aggregate_key_range();
378
379                // Get overlapping tables in next level
380                let get_overlapping = target_level
381                    .iter()
382                    .flat_map(|run| run.get_overlapping(&key_range))
383                    .map(Table::id)
384                    .next();
385
386                if get_overlapping.is_none() && first_level.is_disjoint() {
387                    #[expect(
388                        clippy::cast_possible_truncation,
389                        reason = "level index is bounded by level count (7)"
390                    )]
391                    return Choice::Move(CompactionInput {
392                        table_ids: first_level.list_ids(),
393                        dest_level: target_level_idx as u8,
394                        canonical_level: 1,
395                        target_size: self.target_size,
396                    });
397                }
398            }
399        }
400
401        // Scoring
402        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
403
404        {
405            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
406            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
407            // decisions can prioritize tables that would free the most reclaimable values.
408
409            // Score first level
410            let first_level = version.l0();
411
412            if first_level.table_count() >= usize::from(self.l0_threshold) {
413                #[expect(
414                    clippy::cast_precision_loss,
415                    reason = "precision loss is acceptable for scoring calculations"
416                )]
417                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
418                scores[0] = (ratio, 0);
419            }
420
421            // Score L1+
422            for (idx, level) in version.iter_levels().enumerate().skip(1) {
423                if level.is_empty() {
424                    continue;
425                }
426
427                let level_size = level
428                    .iter()
429                    .flat_map(|x| x.iter())
430                    // NOTE: Take bytes that are already being compacted into account,
431                    // otherwise we may be overcompensating
432                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
433                    .map(Table::file_size)
434                    .sum::<u64>();
435
436                #[expect(
437                    clippy::cast_possible_truncation,
438                    reason = "level index is bounded by level count (7, technically 255)"
439                )]
440                let target_size = self.level_target_size((idx - level_shift) as u8);
441
442                // NOTE: We check for level length above
443                #[expect(clippy::indexing_slicing)]
444                if level_size > target_size {
445                    #[expect(
446                        clippy::cast_precision_loss,
447                        reason = "precision loss is acceptable for scoring calculations"
448                    )]
449                    let score = level_size as f64 / target_size as f64;
450                    scores[idx] = (score, level_size - target_size);
451
452                    // NOTE: Force a trivial move
453                    if version
454                        .level(idx + 1)
455                        .is_some_and(|next_level| next_level.is_empty())
456                    {
457                        scores[idx] = (99.99, 999);
458                    }
459                }
460            }
461
462            // NOTE: Never score Lmax
463            {
464                scores[6] = (0.0, 0);
465            }
466        }
467
468        // Choose compaction
469        #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
470        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
471            .into_iter()
472            .enumerate()
473            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
474                score_a
475                    .partial_cmp(score_b)
476                    .unwrap_or(std::cmp::Ordering::Equal)
477            })
478            .expect("should have highest score somewhere");
479
480        if score < 1.0 {
481            return Choice::DoNothing;
482        }
483
484        // We choose L0->L1 compaction
485        if level_idx_with_highest_score == 0 {
486            let Some(first_level) = version.level(0) else {
487                return Choice::DoNothing;
488            };
489
490            if version.level_is_busy(0, state.hidden_set())
491                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
492            {
493                return Choice::DoNothing;
494            }
495
496            let Some(target_level) = &version.level(canonical_l1_idx) else {
497                return Choice::DoNothing;
498            };
499
500            let mut table_ids = first_level.list_ids();
501
502            let key_range = first_level.aggregate_key_range();
503
504            // Get overlapping tables in next level
505            let target_level_overlapping_table_ids: Vec<_> = target_level
506                .iter()
507                .flat_map(|run| run.get_overlapping(&key_range))
508                .map(Table::id)
509                .collect();
510
511            table_ids.extend(&target_level_overlapping_table_ids);
512
513            #[expect(
514                clippy::cast_possible_truncation,
515                reason = "level index is bounded by level count (7, technically 255)"
516            )]
517            let choice = CompactionInput {
518                table_ids,
519                dest_level: canonical_l1_idx as u8,
520                canonical_level: 1,
521                target_size: self.target_size,
522            };
523
524            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
525                return Choice::Move(choice);
526            }
527            return Choice::Merge(choice);
528        }
529
530        // We choose L1+ compaction instead
531
532        // NOTE: Level count is 255 max
533        #[expect(clippy::cast_possible_truncation)]
534        let curr_level_index = level_idx_with_highest_score as u8;
535
536        let next_level_index = curr_level_index + 1;
537
538        let Some(level) = version.level(level_idx_with_highest_score) else {
539            return Choice::DoNothing;
540        };
541
542        let Some(next_level) = version.level(next_level_index as usize) else {
543            return Choice::DoNothing;
544        };
545
546        debug_assert!(level.is_disjoint(), "level should be disjoint");
547        debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
548
549        #[expect(
550            clippy::expect_used,
551            reason = "first run should exist because score is >0.0"
552        )]
553        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
554            level.first_run().expect("should have exactly one run"),
555            next_level.first_run().map(std::ops::Deref::deref),
556            state.hidden_set(),
557            overshoot_bytes,
558            self.target_size,
559        ) else {
560            return Choice::DoNothing;
561        };
562
563        #[expect(
564            clippy::cast_possible_truncation,
565            reason = "level shift is bounded by level count (7, technically 255)"
566        )]
567        let choice = CompactionInput {
568            table_ids,
569            dest_level: next_level_index,
570            canonical_level: next_level_index - (level_shift as u8),
571            target_size: self.target_size,
572        };
573
574        if can_trivial_move && level.is_disjoint() {
575            return Choice::Move(choice);
576        }
577        Choice::Merge(choice)
578    }
579}