mini_lsm/compact/
leveled.rs1use std::collections::HashSet;
2
3use serde::{Deserialize, Serialize};
4
5use crate::lsm_storage::LsmStorageState;
6
7#[derive(Debug, Serialize, Deserialize)]
8pub struct LeveledCompactionTask {
9 pub upper_level: Option<usize>,
11 pub upper_level_sst_ids: Vec<usize>,
12 pub lower_level: usize,
13 pub lower_level_sst_ids: Vec<usize>,
14 pub is_lower_level_bottom_level: bool,
15}
16
17#[derive(Debug, Clone)]
18pub struct LeveledCompactionOptions {
19 pub level_size_multiplier: usize,
20 pub level0_file_num_compaction_trigger: usize,
21 pub max_levels: usize,
22 pub base_level_size_mb: usize,
23}
24
25pub struct LeveledCompactionController {
26 options: LeveledCompactionOptions,
27}
28
29impl LeveledCompactionController {
30 pub fn new(options: LeveledCompactionOptions) -> Self {
31 Self { options }
32 }
33
34 fn find_overlapping_ssts(
35 &self,
36 snapshot: &LsmStorageState,
37 sst_ids: &[usize],
38 in_level: usize,
39 ) -> Vec<usize> {
40 let begin_key = sst_ids
41 .iter()
42 .map(|id| snapshot.sstables[id].first_key())
43 .min()
44 .cloned()
45 .unwrap();
46 let end_key = sst_ids
47 .iter()
48 .map(|id| snapshot.sstables[id].last_key())
49 .max()
50 .cloned()
51 .unwrap();
52 let mut overlap_ssts = Vec::new();
53 for sst_id in &snapshot.levels[in_level - 1].1 {
54 let sst = &snapshot.sstables[sst_id];
55 let first_key = sst.first_key();
56 let last_key = sst.last_key();
57 if !(last_key < &begin_key || first_key > &end_key) {
58 overlap_ssts.push(*sst_id);
59 }
60 }
61 overlap_ssts
62 }
63
64 pub fn generate_compaction_task(
65 &self,
66 snapshot: &LsmStorageState,
67 ) -> Option<LeveledCompactionTask> {
68 let mut target_level_size = (0..self.options.max_levels).map(|_| 0).collect::<Vec<_>>(); let mut real_level_size = Vec::with_capacity(self.options.max_levels);
71 let mut base_level = self.options.max_levels;
72 for i in 0..self.options.max_levels {
73 real_level_size.push(
74 snapshot.levels[i]
75 .1
76 .iter()
77 .map(|x| snapshot.sstables.get(x).unwrap().table_size())
78 .sum::<u64>() as usize,
79 );
80 }
81 let base_level_size_bytes = self.options.base_level_size_mb * 1024 * 1024;
82
83 target_level_size[self.options.max_levels - 1] =
85 real_level_size[self.options.max_levels - 1].max(base_level_size_bytes);
86 for i in (0..(self.options.max_levels - 1)).rev() {
87 let next_level_size = target_level_size[i + 1];
88 let this_level_size = next_level_size / self.options.level_size_multiplier;
89 if next_level_size > base_level_size_bytes {
90 target_level_size[i] = this_level_size;
91 }
92 if target_level_size[i] > 0 {
93 base_level = i + 1;
94 }
95 }
96
97 if snapshot.l0_sstables.len() >= self.options.level0_file_num_compaction_trigger {
99 println!("flush L0 SST to base level {}", base_level);
100 return Some(LeveledCompactionTask {
101 upper_level: None,
102 upper_level_sst_ids: snapshot.l0_sstables.clone(),
103 lower_level: base_level,
104 lower_level_sst_ids: self.find_overlapping_ssts(
105 snapshot,
106 &snapshot.l0_sstables,
107 base_level,
108 ),
109 is_lower_level_bottom_level: base_level == self.options.max_levels,
110 });
111 }
112
113 let mut priorities = Vec::with_capacity(self.options.max_levels);
114 for level in 0..self.options.max_levels {
115 let prio = real_level_size[level] as f64 / target_level_size[level] as f64;
116 if prio > 1.0 {
117 priorities.push((prio, level + 1));
118 }
119 }
120 priorities.sort_by(|a, b| a.partial_cmp(b).unwrap().reverse());
121 let priority = priorities.first();
122 if let Some((_, level)) = priority {
123 println!(
124 "target level sizes: {:?}, real level sizes: {:?}, base_level: {}",
125 target_level_size
126 .iter()
127 .map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
128 .collect::<Vec<_>>(),
129 real_level_size
130 .iter()
131 .map(|x| format!("{:.3}MB", *x as f64 / 1024.0 / 1024.0))
132 .collect::<Vec<_>>(),
133 base_level,
134 );
135
136 let level = *level;
137 let selected_sst = snapshot.levels[level - 1].1.iter().min().copied().unwrap(); println!(
139 "compaction triggered by priority: {level} out of {:?}, select {selected_sst} for compaction",
140 priorities
141 );
142 return Some(LeveledCompactionTask {
143 upper_level: Some(level),
144 upper_level_sst_ids: vec![selected_sst],
145 lower_level: level + 1,
146 lower_level_sst_ids: self.find_overlapping_ssts(
147 snapshot,
148 &[selected_sst],
149 level + 1,
150 ),
151 is_lower_level_bottom_level: level + 1 == self.options.max_levels,
152 });
153 }
154 None
155 }
156
157 pub fn apply_compaction_result(
158 &self,
159 snapshot: &LsmStorageState,
160 task: &LeveledCompactionTask,
161 output: &[usize],
162 ) -> (LsmStorageState, Vec<usize>) {
163 let mut snapshot = snapshot.clone();
164 let mut files_to_remove = Vec::new();
165 let mut upper_level_sst_ids_set = task
166 .upper_level_sst_ids
167 .iter()
168 .copied()
169 .collect::<HashSet<_>>();
170 let mut lower_level_sst_ids_set = task
171 .lower_level_sst_ids
172 .iter()
173 .copied()
174 .collect::<HashSet<_>>();
175 if let Some(upper_level) = task.upper_level {
176 let new_upper_level_ssts = snapshot.levels[upper_level - 1]
177 .1
178 .iter()
179 .filter_map(|x| {
180 if upper_level_sst_ids_set.remove(x) {
181 return None;
182 }
183 Some(*x)
184 })
185 .collect::<Vec<_>>();
186 assert!(upper_level_sst_ids_set.is_empty());
187 snapshot.levels[upper_level - 1].1 = new_upper_level_ssts;
188 } else {
189 let new_l0_ssts = snapshot
190 .l0_sstables
191 .iter()
192 .filter_map(|x| {
193 if upper_level_sst_ids_set.remove(x) {
194 return None;
195 }
196 Some(*x)
197 })
198 .collect::<Vec<_>>();
199 assert!(upper_level_sst_ids_set.is_empty());
200 snapshot.l0_sstables = new_l0_ssts;
201 }
202
203 files_to_remove.extend(&task.upper_level_sst_ids);
204 files_to_remove.extend(&task.lower_level_sst_ids);
205
206 let mut new_lower_level_ssts = snapshot.levels[task.lower_level - 1]
207 .1
208 .iter()
209 .filter_map(|x| {
210 if lower_level_sst_ids_set.remove(x) {
211 return None;
212 }
213 Some(*x)
214 })
215 .collect::<Vec<_>>();
216 assert!(lower_level_sst_ids_set.is_empty());
217 new_lower_level_ssts.extend(output);
218 new_lower_level_ssts.sort_by(|x, y| {
219 snapshot
220 .sstables
221 .get(x)
222 .unwrap()
223 .first_key()
224 .cmp(snapshot.sstables.get(y).unwrap().first_key())
225 });
226 snapshot.levels[task.lower_level - 1].1 = new_lower_level_ssts;
227 (snapshot, files_to_remove)
228 }
229}