mini_lsm/compact/
tiered.rs1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::lsm_storage::LsmStorageState;
6
7#[derive(Debug, Serialize, Deserialize)]
8pub struct TieredCompactionTask {
9 pub tiers: Vec<(usize, Vec<usize>)>,
10 pub bottom_tier_included: bool,
11}
12
13#[derive(Debug, Clone)]
14pub struct TieredCompactionOptions {
15 pub num_tiers: usize,
16 pub max_size_amplification_percent: usize,
17 pub size_ratio: usize,
18 pub min_merge_width: usize,
19}
20
21pub struct TieredCompactionController {
22 options: TieredCompactionOptions,
23}
24
25impl TieredCompactionController {
26 pub fn new(options: TieredCompactionOptions) -> Self {
27 Self { options }
28 }
29
30 pub fn generate_compaction_task(
31 &self,
32 snapshot: &LsmStorageState,
33 ) -> Option<TieredCompactionTask> {
34 assert!(
35 snapshot.l0_sstables.is_empty(),
36 "should not add l0 ssts in tiered compaction"
37 );
38 if snapshot.levels.len() < self.options.num_tiers {
39 return None;
40 }
41 let mut size = 0;
43 for id in 0..(snapshot.levels.len() - 1) {
44 size += snapshot.levels[id].1.len();
45 }
46 let space_amp_ratio =
47 (size as f64) / (snapshot.levels.last().unwrap().1.len() as f64) * 100.0;
48 if space_amp_ratio >= self.options.max_size_amplification_percent as f64 {
49 println!(
50 "compaction triggered by space amplification ratio: {}",
51 space_amp_ratio
52 );
53 return Some(TieredCompactionTask {
54 tiers: snapshot.levels.clone(),
55 bottom_tier_included: true,
56 });
57 }
58 let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0;
59 let mut size = 0;
61 for id in 0..(snapshot.levels.len() - 1) {
62 size += snapshot.levels[id].1.len();
63 let next_level_size = snapshot.levels[id + 1].1.len();
64 let current_size_ratio = size as f64 / next_level_size as f64;
65 if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width {
66 println!(
67 "compaction triggered by size ratio: {}",
68 current_size_ratio * 100.0
69 );
70 return Some(TieredCompactionTask {
71 tiers: snapshot
72 .levels
73 .iter()
74 .take(id + 2)
75 .cloned()
76 .collect::<Vec<_>>(),
77 bottom_tier_included: id + 2 >= snapshot.levels.len(),
78 });
79 }
80 }
81 let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2;
83 println!("compaction triggered by reducing sorted runs");
84 return Some(TieredCompactionTask {
85 tiers: snapshot
86 .levels
87 .iter()
88 .take(num_tiers_to_take)
89 .cloned()
90 .collect::<Vec<_>>(),
91 bottom_tier_included: snapshot.levels.len() >= num_tiers_to_take,
92 });
93 }
94
95 pub fn apply_compaction_result(
96 &self,
97 snapshot: &LsmStorageState,
98 task: &TieredCompactionTask,
99 output: &[usize],
100 ) -> (LsmStorageState, Vec<usize>) {
101 assert!(
102 snapshot.l0_sstables.is_empty(),
103 "should not add l0 ssts in tiered compaction"
104 );
105 let mut snapshot = snapshot.clone();
106 let mut tier_to_remove = task
107 .tiers
108 .iter()
109 .map(|(x, y)| (*x, y))
110 .collect::<HashMap<_, _>>();
111 let mut levels = Vec::new();
112 let mut new_tier_added = false;
113 let mut files_to_remove = Vec::new();
114 for (tier_id, files) in &snapshot.levels {
115 if let Some(ffiles) = tier_to_remove.remove(tier_id) {
116 assert_eq!(ffiles, files, "file changed after issuing compaction task");
118 files_to_remove.extend(ffiles.iter().copied());
119 } else {
120 levels.push((*tier_id, files.clone()));
122 }
123 if tier_to_remove.is_empty() && !new_tier_added {
124 new_tier_added = true;
126 levels.push((output[0], output.to_vec()));
127 }
128 }
129 if !tier_to_remove.is_empty() {
130 unreachable!("some tiers not found??");
131 }
132 snapshot.levels = levels;
133 (snapshot, files_to_remove)
134 }
135}