mini_lsm/
compact.rs

1mod leveled;
2mod simple_leveled;
3mod tiered;
4
5use std::collections::HashSet;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
11use serde::{Deserialize, Serialize};
12pub use simple_leveled::{
13    SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
14};
15pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
16
17use crate::iterators::concat_iterator::SstConcatIterator;
18use crate::iterators::merge_iterator::MergeIterator;
19use crate::iterators::two_merge_iterator::TwoMergeIterator;
20use crate::iterators::StorageIterator;
21use crate::key::KeySlice;
22use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
23use crate::manifest::ManifestRecord;
24use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
25
26#[derive(Debug, Serialize, Deserialize)]
27pub enum CompactionTask {
28    Leveled(LeveledCompactionTask),
29    Tiered(TieredCompactionTask),
30    Simple(SimpleLeveledCompactionTask),
31    ForceFullCompaction {
32        l0_sstables: Vec<usize>,
33        l1_sstables: Vec<usize>,
34    },
35}
36
37impl CompactionTask {
38    fn compact_to_bottom_level(&self) -> bool {
39        match self {
40            CompactionTask::ForceFullCompaction { .. } => true,
41            CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
42            CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
43            CompactionTask::Tiered(task) => task.bottom_tier_included,
44        }
45    }
46}
47
48pub(crate) enum CompactionController {
49    Leveled(LeveledCompactionController),
50    Tiered(TieredCompactionController),
51    Simple(SimpleLeveledCompactionController),
52    NoCompaction,
53}
54
55impl CompactionController {
56    pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
57        match self {
58            CompactionController::Leveled(ctrl) => ctrl
59                .generate_compaction_task(snapshot)
60                .map(CompactionTask::Leveled),
61            CompactionController::Simple(ctrl) => ctrl
62                .generate_compaction_task(snapshot)
63                .map(CompactionTask::Simple),
64            CompactionController::Tiered(ctrl) => ctrl
65                .generate_compaction_task(snapshot)
66                .map(CompactionTask::Tiered),
67            CompactionController::NoCompaction => unreachable!(),
68        }
69    }
70
71    pub fn apply_compaction_result(
72        &self,
73        snapshot: &LsmStorageState,
74        task: &CompactionTask,
75        output: &[usize],
76    ) -> (LsmStorageState, Vec<usize>) {
77        match (self, task) {
78            (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
79                ctrl.apply_compaction_result(snapshot, task, output)
80            }
81            (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
82                ctrl.apply_compaction_result(snapshot, task, output)
83            }
84            (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
85                ctrl.apply_compaction_result(snapshot, task, output)
86            }
87            _ => unreachable!(),
88        }
89    }
90}
91
92impl CompactionController {
93    pub fn flush_to_l0(&self) -> bool {
94        matches!(
95            self,
96            Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction
97        )
98    }
99}
100
101#[derive(Debug, Clone)]
102pub enum CompactionOptions {
103    /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
104    /// Compaction)
105    Leveled(LeveledCompactionOptions),
106    /// Tiered compaction (= RocksDB's universal compaction)
107    Tiered(TieredCompactionOptions),
108    /// Simple leveled compaction
109    Simple(SimpleLeveledCompactionOptions),
110    /// In no compaction mode (week 1), always flush to L0
111    NoCompaction,
112}
113
114impl LsmStorageInner {
115    fn compact_generate_sst_from_iter(
116        &self,
117        mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
118        compact_to_bottom_level: bool,
119    ) -> Result<Vec<Arc<SsTable>>> {
120        let mut builder = None;
121        let mut new_sst = Vec::new();
122
123        while iter.is_valid() {
124            if builder.is_none() {
125                builder = Some(SsTableBuilder::new(self.options.block_size));
126            }
127            let builder_inner = builder.as_mut().unwrap();
128            if compact_to_bottom_level {
129                if !iter.value().is_empty() {
130                    builder_inner.add(iter.key(), iter.value());
131                }
132            } else {
133                builder_inner.add(iter.key(), iter.value());
134            }
135            iter.next()?;
136
137            if builder_inner.estimated_size() >= self.options.target_sst_size {
138                let sst_id = self.next_sst_id();
139                let builder = builder.take().unwrap();
140                let sst = Arc::new(builder.build(
141                    sst_id,
142                    Some(self.block_cache.clone()),
143                    self.path_of_sst(sst_id),
144                )?);
145                new_sst.push(sst);
146            }
147        }
148        if let Some(builder) = builder {
149            let sst_id = self.next_sst_id(); // lock dropped here
150            let sst = Arc::new(builder.build(
151                sst_id,
152                Some(self.block_cache.clone()),
153                self.path_of_sst(sst_id),
154            )?);
155            new_sst.push(sst);
156        }
157        Ok(new_sst)
158    }
159
160    fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
161        let snapshot = {
162            let state = self.state.read();
163            state.clone()
164        };
165        match task {
166            CompactionTask::ForceFullCompaction {
167                l0_sstables,
168                l1_sstables,
169            } => {
170                let mut l0_iters = Vec::with_capacity(l0_sstables.len());
171                for id in l0_sstables.iter() {
172                    l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
173                        snapshot.sstables.get(id).unwrap().clone(),
174                    )?));
175                }
176                let mut l1_iters = Vec::with_capacity(l1_sstables.len());
177                for id in l1_sstables.iter() {
178                    l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
179                }
180                let iter = TwoMergeIterator::create(
181                    MergeIterator::create(l0_iters),
182                    SstConcatIterator::create_and_seek_to_first(l1_iters)?,
183                )?;
184                self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
185            }
186            CompactionTask::Simple(SimpleLeveledCompactionTask {
187                upper_level,
188                upper_level_sst_ids,
189                lower_level: _,
190                lower_level_sst_ids,
191                ..
192            })
193            | CompactionTask::Leveled(LeveledCompactionTask {
194                upper_level,
195                upper_level_sst_ids,
196                lower_level: _,
197                lower_level_sst_ids,
198                ..
199            }) => match upper_level {
200                Some(_) => {
201                    let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
202                    for id in upper_level_sst_ids.iter() {
203                        upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
204                    }
205                    let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
206                    let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
207                    for id in lower_level_sst_ids.iter() {
208                        lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
209                    }
210                    let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
211                    self.compact_generate_sst_from_iter(
212                        TwoMergeIterator::create(upper_iter, lower_iter)?,
213                        task.compact_to_bottom_level(),
214                    )
215                }
216                None => {
217                    let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
218                    for id in upper_level_sst_ids.iter() {
219                        upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
220                            snapshot.sstables.get(id).unwrap().clone(),
221                        )?));
222                    }
223                    let upper_iter = MergeIterator::create(upper_iters);
224                    let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
225                    for id in lower_level_sst_ids.iter() {
226                        lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
227                    }
228                    let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
229                    self.compact_generate_sst_from_iter(
230                        TwoMergeIterator::create(upper_iter, lower_iter)?,
231                        task.compact_to_bottom_level(),
232                    )
233                }
234            },
235            CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
236                let mut iters = Vec::with_capacity(tiers.len());
237                for (_, tier_sst_ids) in tiers {
238                    let mut ssts = Vec::with_capacity(tier_sst_ids.len());
239                    for id in tier_sst_ids.iter() {
240                        ssts.push(snapshot.sstables.get(id).unwrap().clone());
241                    }
242                    iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?));
243                }
244                self.compact_generate_sst_from_iter(
245                    MergeIterator::create(iters),
246                    task.compact_to_bottom_level(),
247                )
248            }
249        }
250    }
251
252    pub fn force_full_compaction(&self) -> Result<()> {
253        let CompactionOptions::NoCompaction = self.options.compaction_options else {
254            panic!("full compaction can only be called with compaction is not enabled")
255        };
256
257        let snapshot = {
258            let state = self.state.read();
259            state.clone()
260        };
261
262        let l0_sstables = snapshot.l0_sstables.clone();
263        let l1_sstables = snapshot.levels[0].1.clone();
264        let compaction_task = CompactionTask::ForceFullCompaction {
265            l0_sstables: l0_sstables.clone(),
266            l1_sstables: l1_sstables.clone(),
267        };
268
269        println!("force full compaction: {:?}", compaction_task);
270
271        let sstables = self.compact(&compaction_task)?;
272        let mut ids = Vec::with_capacity(sstables.len());
273
274        {
275            let state_lock = self.state_lock.lock();
276            let mut state = self.state.read().as_ref().clone();
277            for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
278                let result = state.sstables.remove(sst);
279                assert!(result.is_some());
280            }
281            for new_sst in sstables {
282                ids.push(new_sst.sst_id());
283                let result = state.sstables.insert(new_sst.sst_id(), new_sst);
284                assert!(result.is_none());
285            }
286            assert_eq!(l1_sstables, state.levels[0].1);
287            state.levels[0].1 = ids.clone();
288            let mut l0_sstables_map = l0_sstables.iter().copied().collect::<HashSet<_>>();
289            state.l0_sstables = state
290                .l0_sstables
291                .iter()
292                .filter(|x| !l0_sstables_map.remove(x))
293                .copied()
294                .collect::<Vec<_>>();
295            assert!(l0_sstables_map.is_empty());
296            *self.state.write() = Arc::new(state);
297            self.sync_dir()?;
298            self.manifest.as_ref().unwrap().add_record(
299                &state_lock,
300                ManifestRecord::Compaction(compaction_task, ids.clone()),
301            )?;
302        }
303        for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
304            std::fs::remove_file(self.path_of_sst(*sst))?;
305        }
306
307        println!("force full compaction done, new SSTs: {:?}", ids);
308
309        Ok(())
310    }
311
312    fn trigger_compaction(&self) -> Result<()> {
313        let snapshot = {
314            let state = self.state.read();
315            state.clone()
316        };
317        let task = self
318            .compaction_controller
319            .generate_compaction_task(&snapshot);
320        let Some(task) = task else {
321            return Ok(());
322        };
323        self.dump_structure();
324        println!("running compaction task: {:?}", task);
325        let sstables = self.compact(&task)?;
326        let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
327        let ssts_to_remove = {
328            let state_lock = self.state_lock.lock();
329            let mut snapshot = self.state.read().as_ref().clone();
330            let mut new_sst_ids = Vec::new();
331            for file_to_add in sstables {
332                new_sst_ids.push(file_to_add.sst_id());
333                let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
334                assert!(result.is_none());
335            }
336            let (mut snapshot, files_to_remove) = self
337                .compaction_controller
338                .apply_compaction_result(&snapshot, &task, &output);
339            let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
340            for file_to_remove in &files_to_remove {
341                let result = snapshot.sstables.remove(file_to_remove);
342                assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
343                ssts_to_remove.push(result.unwrap());
344            }
345            let mut state = self.state.write();
346            *state = Arc::new(snapshot);
347            drop(state);
348            self.sync_dir()?;
349            self.manifest
350                .as_ref()
351                .unwrap()
352                .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
353            ssts_to_remove
354        };
355        println!(
356            "compaction finished: {} files removed, {} files added, output={:?}",
357            ssts_to_remove.len(),
358            output.len(),
359            output
360        );
361        for sst in ssts_to_remove {
362            std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;
363        }
364        self.sync_dir()?;
365
366        Ok(())
367    }
368
369    pub(crate) fn spawn_compaction_thread(
370        self: &Arc<Self>,
371        rx: crossbeam_channel::Receiver<()>,
372    ) -> Result<Option<std::thread::JoinHandle<()>>> {
373        if let CompactionOptions::Leveled(_)
374        | CompactionOptions::Simple(_)
375        | CompactionOptions::Tiered(_) = self.options.compaction_options
376        {
377            let this = self.clone();
378            let handle = std::thread::spawn(move || {
379                let ticker = crossbeam_channel::tick(Duration::from_millis(50));
380                loop {
381                    crossbeam_channel::select! {
382                        recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
383                            eprintln!("compaction failed: {}", e);
384                        },
385                        recv(rx) -> _ => return
386                    }
387                }
388            });
389            return Ok(Some(handle));
390        }
391        Ok(None)
392    }
393
394    fn trigger_flush(&self) -> Result<()> {
395        let res = {
396            let state = self.state.read();
397            state.imm_memtables.len() >= self.options.num_memtable_limit
398        };
399        if res {
400            self.force_flush_next_imm_memtable()?;
401        }
402
403        Ok(())
404    }
405
406    pub(crate) fn spawn_flush_thread(
407        self: &Arc<Self>,
408        rx: crossbeam_channel::Receiver<()>,
409    ) -> Result<Option<std::thread::JoinHandle<()>>> {
410        let this = self.clone();
411        let handle = std::thread::spawn(move || {
412            let ticker = crossbeam_channel::tick(Duration::from_millis(50));
413            loop {
414                crossbeam_channel::select! {
415                    recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
416                        eprintln!("flush failed: {}", e);
417                    },
418                    recv(rx) -> _ => return
419                }
420            }
421        });
422        Ok(Some(handle))
423    }
424}