mini_lsm/compact/
tiered.rs

1use std::collections::HashMap;
2
3use serde::{Deserialize, Serialize};
4
5use crate::lsm_storage::LsmStorageState;
6
7#[derive(Debug, Serialize, Deserialize)]
8pub struct TieredCompactionTask {
9    pub tiers: Vec<(usize, Vec<usize>)>,
10    pub bottom_tier_included: bool,
11}
12
13#[derive(Debug, Clone)]
14pub struct TieredCompactionOptions {
15    pub num_tiers: usize,
16    pub max_size_amplification_percent: usize,
17    pub size_ratio: usize,
18    pub min_merge_width: usize,
19}
20
21pub struct TieredCompactionController {
22    options: TieredCompactionOptions,
23}
24
25impl TieredCompactionController {
26    pub fn new(options: TieredCompactionOptions) -> Self {
27        Self { options }
28    }
29
30    pub fn generate_compaction_task(
31        &self,
32        snapshot: &LsmStorageState,
33    ) -> Option<TieredCompactionTask> {
34        assert!(
35            snapshot.l0_sstables.is_empty(),
36            "should not add l0 ssts in tiered compaction"
37        );
38        if snapshot.levels.len() < self.options.num_tiers {
39            return None;
40        }
41        // compaction triggered by space amplification ratio
42        let mut size = 0;
43        for id in 0..(snapshot.levels.len() - 1) {
44            size += snapshot.levels[id].1.len();
45        }
46        let space_amp_ratio =
47            (size as f64) / (snapshot.levels.last().unwrap().1.len() as f64) * 100.0;
48        if space_amp_ratio >= self.options.max_size_amplification_percent as f64 {
49            println!(
50                "compaction triggered by space amplification ratio: {}",
51                space_amp_ratio
52            );
53            return Some(TieredCompactionTask {
54                tiers: snapshot.levels.clone(),
55                bottom_tier_included: true,
56            });
57        }
58        let size_ratio_trigger = (100.0 + self.options.size_ratio as f64) / 100.0;
59        // compaction triggered by size ratio
60        let mut size = 0;
61        for id in 0..(snapshot.levels.len() - 1) {
62            size += snapshot.levels[id].1.len();
63            let next_level_size = snapshot.levels[id + 1].1.len();
64            let current_size_ratio = size as f64 / next_level_size as f64;
65            if current_size_ratio >= size_ratio_trigger && id + 2 >= self.options.min_merge_width {
66                println!(
67                    "compaction triggered by size ratio: {}",
68                    current_size_ratio * 100.0
69                );
70                return Some(TieredCompactionTask {
71                    tiers: snapshot
72                        .levels
73                        .iter()
74                        .take(id + 2)
75                        .cloned()
76                        .collect::<Vec<_>>(),
77                    bottom_tier_included: id + 2 >= snapshot.levels.len(),
78                });
79            }
80        }
81        // trying to reduce sorted runs without respecting size ratio
82        let num_tiers_to_take = snapshot.levels.len() - self.options.num_tiers + 2;
83        println!("compaction triggered by reducing sorted runs");
84        return Some(TieredCompactionTask {
85            tiers: snapshot
86                .levels
87                .iter()
88                .take(num_tiers_to_take)
89                .cloned()
90                .collect::<Vec<_>>(),
91            bottom_tier_included: snapshot.levels.len() >= num_tiers_to_take,
92        });
93    }
94
95    pub fn apply_compaction_result(
96        &self,
97        snapshot: &LsmStorageState,
98        task: &TieredCompactionTask,
99        output: &[usize],
100    ) -> (LsmStorageState, Vec<usize>) {
101        assert!(
102            snapshot.l0_sstables.is_empty(),
103            "should not add l0 ssts in tiered compaction"
104        );
105        let mut snapshot = snapshot.clone();
106        let mut tier_to_remove = task
107            .tiers
108            .iter()
109            .map(|(x, y)| (*x, y))
110            .collect::<HashMap<_, _>>();
111        let mut levels = Vec::new();
112        let mut new_tier_added = false;
113        let mut files_to_remove = Vec::new();
114        for (tier_id, files) in &snapshot.levels {
115            if let Some(ffiles) = tier_to_remove.remove(tier_id) {
116                // the tier should be removed
117                assert_eq!(ffiles, files, "file changed after issuing compaction task");
118                files_to_remove.extend(ffiles.iter().copied());
119            } else {
120                // retain the tier
121                levels.push((*tier_id, files.clone()));
122            }
123            if tier_to_remove.is_empty() && !new_tier_added {
124                // add the compacted tier to the LSM tree
125                new_tier_added = true;
126                levels.push((output[0], output.to_vec()));
127            }
128        }
129        if !tier_to_remove.is_empty() {
130            unreachable!("some tiers not found??");
131        }
132        snapshot.levels = levels;
133        (snapshot, files_to_remove)
134    }
135}