mini_lsm_mvcc/compact/
simple_leveled.rs

1use std::collections::HashSet;
2
3use serde::{Deserialize, Serialize};
4
5use crate::lsm_storage::LsmStorageState;
6
7#[derive(Debug, Clone)]
8pub struct SimpleLeveledCompactionOptions {
9    pub size_ratio_percent: usize,
10    pub level0_file_num_compaction_trigger: usize,
11    pub max_levels: usize,
12}
13
14#[derive(Debug, Serialize, Deserialize)]
15pub struct SimpleLeveledCompactionTask {
16    // if upper_level is `None`, then it is L0 compaction
17    pub upper_level: Option<usize>,
18    pub upper_level_sst_ids: Vec<usize>,
19    pub lower_level: usize,
20    pub lower_level_sst_ids: Vec<usize>,
21    pub is_lower_level_bottom_level: bool,
22}
23
24pub struct SimpleLeveledCompactionController {
25    options: SimpleLeveledCompactionOptions,
26}
27
28impl SimpleLeveledCompactionController {
29    pub fn new(options: SimpleLeveledCompactionOptions) -> Self {
30        Self { options }
31    }
32
33    /// Generates a compaction task.
34    ///
35    /// Returns `None` if no compaction needs to be scheduled. The order of SSTs in the compaction task id vector matters.
36    pub fn generate_compaction_task(
37        &self,
38        snapshot: &LsmStorageState,
39    ) -> Option<SimpleLeveledCompactionTask> {
40        let mut level_sizes = Vec::new();
41        level_sizes.push(snapshot.l0_sstables.len());
42        for (_, files) in &snapshot.levels {
43            level_sizes.push(files.len());
44        }
45
46        for i in 0..self.options.max_levels {
47            if i == 0
48                && snapshot.l0_sstables.len() < self.options.level0_file_num_compaction_trigger
49            {
50                continue;
51            }
52
53            let lower_level = i + 1;
54            let size_ratio = level_sizes[lower_level] as f64 / level_sizes[i] as f64;
55            if size_ratio < self.options.size_ratio_percent as f64 / 100.0 {
56                println!(
57                    "compaction triggered at level {} and {} with size ratio {}",
58                    i, lower_level, size_ratio
59                );
60                return Some(SimpleLeveledCompactionTask {
61                    upper_level: if i == 0 { None } else { Some(i) },
62                    upper_level_sst_ids: if i == 0 {
63                        snapshot.l0_sstables.clone()
64                    } else {
65                        snapshot.levels[i - 1].1.clone()
66                    },
67                    lower_level,
68                    lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(),
69                    is_lower_level_bottom_level: lower_level == self.options.max_levels,
70                });
71            }
72        }
73        None
74    }
75
76    /// Apply the compaction result.
77    ///
78    /// The compactor will call this function with the compaction task and the list of SST ids generated. This function applies the
79    /// result and generates a new LSM state. The functions should only change `l0_sstables` and `levels` without changing memtables
80    /// and `sstables` hash map. Though there should only be one thread running compaction jobs, you should think about the case
81    /// where an L0 SST gets flushed while the compactor generates new SSTs, and with that in mind, you should do some sanity checks
82    /// in your implementation.
83    pub fn apply_compaction_result(
84        &self,
85        snapshot: &LsmStorageState,
86        task: &SimpleLeveledCompactionTask,
87        output: &[usize],
88    ) -> (LsmStorageState, Vec<usize>) {
89        let mut snapshot = snapshot.clone();
90        let mut files_to_remove = Vec::new();
91        if let Some(upper_level) = task.upper_level {
92            assert_eq!(
93                task.upper_level_sst_ids,
94                snapshot.levels[upper_level - 1].1,
95                "sst mismatched"
96            );
97            files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
98            snapshot.levels[upper_level - 1].1.clear();
99        } else {
100            files_to_remove.extend(&task.upper_level_sst_ids);
101            let mut l0_ssts_compacted = task
102                .upper_level_sst_ids
103                .iter()
104                .copied()
105                .collect::<HashSet<_>>();
106            let new_l0_sstables = snapshot
107                .l0_sstables
108                .iter()
109                .copied()
110                .filter(|x| !l0_ssts_compacted.remove(x))
111                .collect::<Vec<_>>();
112            assert!(l0_ssts_compacted.is_empty());
113            snapshot.l0_sstables = new_l0_sstables;
114        }
115        assert_eq!(
116            task.lower_level_sst_ids,
117            snapshot.levels[task.lower_level - 1].1,
118            "sst mismatched"
119        );
120        files_to_remove.extend(&snapshot.levels[task.lower_level - 1].1);
121        snapshot.levels[task.lower_level - 1].1 = output.to_vec();
122        (snapshot, files_to_remove)
123    }
124}