mini_lsm_mvcc/compact/
leveled.rs

1use std::collections::HashSet;
2
3use serde::{Deserialize, Serialize};
4
5use crate::lsm_storage::LsmStorageState;
6
7#[derive(Debug, Serialize, Deserialize)]
8pub struct LeveledCompactionTask {
9    // if upper_level is `None`, then it is L0 compaction
10    pub upper_level: Option<usize>,
11    pub upper_level_sst_ids: Vec<usize>,
12    pub lower_level: usize,
13    pub lower_level_sst_ids: Vec<usize>,
14    pub is_lower_level_bottom_level: bool,
15}
16
17#[derive(Debug, Clone)]
18pub struct LeveledCompactionOptions {
19    pub level_size_multiplier: usize,
20    pub level0_file_num_compaction_trigger: usize,
21    pub max_levels: usize,
22    pub base_level_size_mb: usize,
23}
24
25pub struct LeveledCompactionController {
26    options: LeveledCompactionOptions,
27}
28
29impl LeveledCompactionController {
30    pub fn new(options: LeveledCompactionOptions) -> Self {
31        Self { options }
32    }
33
34    fn find_overlapping_ssts(
35        &self,
36        snapshot: &LsmStorageState,
37        sst_ids: &[usize],
38        in_level: usize,
39    ) -> Vec<usize> {
40        let begin_key = sst_ids
41            .iter()
42            .map(|id| snapshot.sstables[id].first_key())
43            .min()
44            .cloned()
45            .unwrap();
46        let end_key = sst_ids
47            .iter()
48            .map(|id| snapshot.sstables[id].last_key())
49            .max()
50            .cloned()
51            .unwrap();
52        let mut overlap_ssts = Vec::new();
53        for sst_id in &snapshot.levels[in_level - 1].1 {
54            let sst = &snapshot.sstables[sst_id];
55            let first_key = sst.first_key();
56            let last_key = sst.last_key();
57            if !(last_key < &begin_key || first_key > &end_key) {
58                overlap_ssts.push(*sst_id);
59            }
60        }
61        overlap_ssts
62    }
63
64    pub fn generate_compaction_task(
65        &self,
66        snapshot: &LsmStorageState,
67    ) -> Option<LeveledCompactionTask> {
68        // step 1: compute target level size
69        let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::<Vec<_>>(); // exclude level 0
70        let mut real_level_size = Vec::with_capacity(self.options.max_levels);
71        let mut base_level = self.options.max_levels;
72        for i in 0..self.options.max_levels {
73            real_level_size.push(
74                snapshot.levels[i]
75                    .1
76                    .iter()
77                    .map(|x| snapshot.sstables.get(x).unwrap().table_size())
78                    .sum::<u64>() as usize,
79            );
80        }
81        let base_level_size_bytes = self.options.base_level_size_mb * 1024 * 1024;
82
83        // select base level and compute target level size
84        target_level_size[self.options.max_levels - 1] =
85            real_level_size[self.options.max_levels - 1].max(base_level_size_bytes);
86        for i in (0..(self.options.max_levels - 1)).rev() {
87            let next_level_size = target_level_size[i + 1];
88            let this_level_size = next_level_size / self.options.level_size_multiplier;
89            if next_level_size > base_level_size_bytes {
90                target_level_size[i] = this_level_size;
91            }
92            if target_level_size[i] > 0 {
93                base_level = i + 1;
94            }
95        }
96
97        // Flush L0 SST is the top priority
98        if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger {
99            println!("flush L0 SST to base level {}", base_level);
100            return Some(LeveledCompactionTask {
101                upper_level: None,
102                upper_level_sst_ids: snapshot.l0_sstables.clone(),
103                lower_level: base_level,
104                lower_level_sst_ids: self.find_overlapping_ssts(
105                    snapshot,
106                    &snapshot.l0_sstables,
107                    base_level,
108                ),
109                is_lower_level_bottom_level: base_level == self.options.max_levels,
110            });
111        }
112
113        let mut priorities = Vec::with_capacity(self.options.max_levels);
114        for level in 0..self.options.max_levels {
115            let prio = real_level_size[level] as f64 / target_level_size[level] as f64;
116            if prio > 1.0 {
117                priorities.push((prio, level + 1));
118            }
119        }
120        priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse());
121
122        let priority = priorities.first();
123        if let Some((_, level)) = priority {
124            println!(
125                "target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
126                target_level_size
127                    .iter()
128                    .map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
129                    .collect::<Vec<_>>(),
130                real_level_size
131                    .iter()
132                    .map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
133                    .collect::<Vec<_>>(),
134                base_level,
135            );
136
137            let level = *level;
138            let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); // select the oldest sst to compact
139            println!(
140                "compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction",
141                priorities
142            );
143            return Some(LeveledCompactionTask {
144                upper_level: Some(level),
145                upper_level_sst_ids: vec![selected_sst],
146                lower_level: level + 1,
147                lower_level_sst_ids: self.find_overlapping_ssts(
148                    snapshot,
149                    &[selected_sst],
150                    level + 1,
151                ),
152                is_lower_level_bottom_level: level + 1 == self.options.max_levels,
153            });
154        }
155        None
156    }
157
158    pub fn apply_compaction_result(
159        &self,
160        snapshot: &LsmStorageState,
161        task: &LeveledCompactionTask,
162        output: &[usize],
163    ) -> (LsmStorageState, Vec<usize>) {
164        let mut snapshot = snapshot.clone();
165        let mut files_to_remove = Vec::new();
166        let mut upper_level_sst_ids_set = task
167            .upper_level_sst_ids
168            .iter()
169            .copied()
170            .collect::<HashSet<_>>();
171        let mut lower_level_sst_ids_set = task
172            .lower_level_sst_ids
173            .iter()
174            .copied()
175            .collect::<HashSet<_>>();
176        if let Some(upper_level) = task.upper_level {
177            let new_upper_level_ssts = snapshot.levels[upper_level - 1]
178                .1
179                .iter()
180                .filter_map(|x| {
181                    if upper_level_sst_ids_set.remove(x) {
182                        return None;
183                    }
184                    Some(*x)
185                })
186                .collect::<Vec<_>>();
187            assert!(upper_level_sst_ids_set.is_empty());
188            snapshot.levels[upper_level - 1].1 = new_upper_level_ssts;
189        } else {
190            let new_l0_ssts = snapshot
191                .l0_sstables
192                .iter()
193                .filter_map(|x| {
194                    if upper_level_sst_ids_set.remove(x) {
195                        return None;
196                    }
197                    Some(*x)
198                })
199                .collect::<Vec<_>>();
200            assert!(upper_level_sst_ids_set.is_empty());
201            snapshot.l0_sstables = new_l0_ssts;
202        }
203
204        files_to_remove.extend(&task.upper_level_sst_ids);
205        files_to_remove.extend(&task.lower_level_sst_ids);
206
207        let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1]
208            .1
209            .iter()
210            .filter_map(|x| {
211                if lower_level_sst_ids_set.remove(x) {
212                    return None;
213                }
214                Some(*x)
215            })
216            .collect::<Vec<_>>();
217        assert!(lower_level_sst_ids_set.is_empty());
218        new_lower_level_ssts.extend(output);
219        new_lower_level_ssts.sort_by(|x, y| {
220            snapshot
221                .sstables
222                .get(x)
223                .unwrap()
224                .first_key()
225                .cmp(snapshot.sstables.get(y).unwrap().first_key())
226        });
227        snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
228        (snapshot, files_to_remove)
229    }
230}