mini_lsm_mvcc/compact/
simple_leveled.rs1use std::collections::HashSet;
2
3use serde::{Deserialize, Serialize};
4
5use crate::lsm_storage::LsmStorageState;
6
7#[derive(Debug, Clone)]
8pub struct SimpleLeveledCompactionOptions {
9 pub size_ratio_percent: usize,
10 pub level0_file_num_compaction_trigger: usize,
11 pub max_levels: usize,
12}
13
14#[derive(Debug, Serialize, Deserialize)]
15pub struct SimpleLeveledCompactionTask {
16 pub upper_level: Option<usize>,
18 pub upper_level_sst_ids: Vec<usize>,
19 pub lower_level: usize,
20 pub lower_level_sst_ids: Vec<usize>,
21 pub is_lower_level_bottom_level: bool,
22}
23
24pub struct SimpleLeveledCompactionController {
25 options: SimpleLeveledCompactionOptions,
26}
27
28impl SimpleLeveledCompactionController {
29 pub fn new(options: SimpleLeveledCompactionOptions) -> Self {
30 Self { options }
31 }
32
33 pub fn generate_compaction_task(
37 &self,
38 snapshot: &LsmStorageState,
39 ) -> Option<SimpleLeveledCompactionTask> {
40 let mut level_sizes = Vec::new();
41 level_sizes.push(snapshot.l0_sstables.len());
42 for (_, files) in &snapshot.levels {
43 level_sizes.push(files.len());
44 }
45
46 for i in 0..self.options.max_levels {
47 if i == 0
48 && snapshot.l0_sstables.len() < self.options.level0_file_num_compaction_trigger
49 {
50 continue;
51 }
52
53 let lower_level = i + 1;
54 let size_ratio = level_sizes[lower_level] as f64 / level_sizes[i] as f64;
55 if size_ratio < self.options.size_ratio_percent as f64 / 100.0 {
56 println!(
57 "compaction triggered at level {} and {} with size ratio {}",
58 i, lower_level, size_ratio
59 );
60 return Some(SimpleLeveledCompactionTask {
61 upper_level: if i == 0 { None } else { Some(i) },
62 upper_level_sst_ids: if i == 0 {
63 snapshot.l0_sstables.clone()
64 } else {
65 snapshot.levels[i - 1].1.clone()
66 },
67 lower_level,
68 lower_level_sst_ids: snapshot.levels[lower_level - 1].1.clone(),
69 is_lower_level_bottom_level: lower_level == self.options.max_levels,
70 });
71 }
72 }
73 None
74 }
75
76 pub fn apply_compaction_result(
84 &self,
85 snapshot: &LsmStorageState,
86 task: &SimpleLeveledCompactionTask,
87 output: &[usize],
88 ) -> (LsmStorageState, Vec<usize>) {
89 let mut snapshot = snapshot.clone();
90 let mut files_to_remove = Vec::new();
91 if let Some(upper_level) = task.upper_level {
92 assert_eq!(
93 task.upper_level_sst_ids,
94 snapshot.levels[upper_level - 1].1,
95 "sst mismatched"
96 );
97 files_to_remove.extend(&snapshot.levels[upper_level - 1].1);
98 snapshot.levels[upper_level - 1].1.clear();
99 } else {
100 files_to_remove.extend(&task.upper_level_sst_ids);
101 let mut l0_ssts_compacted = task
102 .upper_level_sst_ids
103 .iter()
104 .copied()
105 .collect::<HashSet<_>>();
106 let new_l0_sstables = snapshot
107 .l0_sstables
108 .iter()
109 .copied()
110 .filter(|x| !l0_ssts_compacted.remove(x))
111 .collect::<Vec<_>>();
112 assert!(l0_ssts_compacted.is_empty());
113 snapshot.l0_sstables = new_l0_sstables;
114 }
115 assert_eq!(
116 task.lower_level_sst_ids,
117 snapshot.levels[task.lower_level - 1].1,
118 "sst mismatched"
119 );
120 files_to_remove.extend(&snapshot.levels[task.lower_level - 1].1);
121 snapshot.levels[task.lower_level - 1].1 = output.to_vec();
122 (snapshot, files_to_remove)
123 }
124}