mini_lsm_mvcc/
compact.rs

1mod leveled;
2mod simple_leveled;
3mod tiered;
4
5use std::collections::HashSet;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
11use serde::{Deserialize, Serialize};
12pub use simple_leveled::{
13    SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
14};
15pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
16
17use crate::iterators::concat_iterator::SstConcatIterator;
18use crate::iterators::merge_iterator::MergeIterator;
19use crate::iterators::two_merge_iterator::TwoMergeIterator;
20use crate::iterators::StorageIterator;
21use crate::key::KeySlice;
22use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState};
23use crate::manifest::ManifestRecord;
24use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
25
26#[derive(Debug, Serialize, Deserialize)]
27pub enum CompactionTask {
28    Leveled(LeveledCompactionTask),
29    Tiered(TieredCompactionTask),
30    Simple(SimpleLeveledCompactionTask),
31    ForceFullCompaction {
32        l0_sstables: Vec<usize>,
33        l1_sstables: Vec<usize>,
34    },
35}
36
37impl CompactionTask {
38    fn compact_to_bottom_level(&self) -> bool {
39        match self {
40            CompactionTask::ForceFullCompaction { .. } => true,
41            CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
42            CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
43            CompactionTask::Tiered(task) => task.bottom_tier_included,
44        }
45    }
46}
47
48pub(crate) enum CompactionController {
49    Leveled(LeveledCompactionController),
50    Tiered(TieredCompactionController),
51    Simple(SimpleLeveledCompactionController),
52    NoCompaction,
53}
54
55impl CompactionController {
56    pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
57        match self {
58            CompactionController::Leveled(ctrl) => ctrl
59                .generate_compaction_task(snapshot)
60                .map(CompactionTask::Leveled),
61            CompactionController::Simple(ctrl) => ctrl
62                .generate_compaction_task(snapshot)
63                .map(CompactionTask::Simple),
64            CompactionController::Tiered(ctrl) => ctrl
65                .generate_compaction_task(snapshot)
66                .map(CompactionTask::Tiered),
67            CompactionController::NoCompaction => unreachable!(),
68        }
69    }
70
71    pub fn apply_compaction_result(
72        &self,
73        snapshot: &LsmStorageState,
74        task: &CompactionTask,
75        output: &[usize],
76    ) -> (LsmStorageState, Vec<usize>) {
77        match (self, task) {
78            (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
79                ctrl.apply_compaction_result(snapshot, task, output)
80            }
81            (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
82                ctrl.apply_compaction_result(snapshot, task, output)
83            }
84            (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
85                ctrl.apply_compaction_result(snapshot, task, output)
86            }
87            _ => unreachable!(),
88        }
89    }
90}
91
92impl CompactionController {
93    pub fn flush_to_l0(&self) -> bool {
94        matches!(
95            self,
96            Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction
97        )
98    }
99}
100
101#[derive(Debug, Clone)]
102pub enum CompactionOptions {
103    /// Leveled compaction with partial compaction + dynamic level support (= RocksDB's Leveled
104    /// Compaction)
105    Leveled(LeveledCompactionOptions),
106    /// Tiered compaction (= RocksDB's universal compaction)
107    Tiered(TieredCompactionOptions),
108    /// Simple leveled compaction
109    Simple(SimpleLeveledCompactionOptions),
110    /// In no compaction mode (week 1), always flush to L0
111    NoCompaction,
112}
113
114impl LsmStorageInner {
115    fn compact_generate_sst_from_iter(
116        &self,
117        mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
118        compact_to_bottom_level: bool,
119    ) -> Result<Vec<Arc<SsTable>>> {
120        let mut builder = None;
121        let mut new_sst = Vec::new();
122        let watermark = self.mvcc().watermark();
123        let mut last_key = Vec::<u8>::new();
124        let mut first_key_below_watermark = false;
125        let compaction_filters = self.compaction_filters.lock().clone();
126        'outer: while iter.is_valid() {
127            if builder.is_none() {
128                builder = Some(SsTableBuilder::new(self.options.block_size));
129            }
130
131            let same_as_last_key = iter.key().key_ref() == last_key;
132            if !same_as_last_key {
133                first_key_below_watermark = true;
134            }
135
136            if compact_to_bottom_level
137                && !same_as_last_key
138                && iter.key().ts() <= watermark
139                && iter.value().is_empty()
140            {
141                last_key.clear();
142                last_key.extend(iter.key().key_ref());
143                iter.next()?;
144                first_key_below_watermark = false;
145                continue;
146            }
147
148            if iter.key().ts() <= watermark {
149                if same_as_last_key && !first_key_below_watermark {
150                    iter.next()?;
151                    continue;
152                }
153
154                first_key_below_watermark = false;
155
156                if !compaction_filters.is_empty() {
157                    for filter in &compaction_filters {
158                        match filter {
159                            CompactionFilter::Prefix(x) => {
160                                if iter.key().key_ref().starts_with(x) {
161                                    iter.next()?;
162                                    continue 'outer;
163                                }
164                            }
165                        }
166                    }
167                }
168            }
169
170            let builder_inner = builder.as_mut().unwrap();
171
172            if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key {
173                let sst_id = self.next_sst_id();
174                let old_builder = builder.take().unwrap();
175                let sst = Arc::new(old_builder.build(
176                    sst_id,
177                    Some(self.block_cache.clone()),
178                    self.path_of_sst(sst_id),
179                )?);
180                new_sst.push(sst);
181                builder = Some(SsTableBuilder::new(self.options.block_size));
182            }
183
184            let builder_inner = builder.as_mut().unwrap();
185            builder_inner.add(iter.key(), iter.value());
186
187            if !same_as_last_key {
188                last_key.clear();
189                last_key.extend(iter.key().key_ref());
190            }
191
192            iter.next()?;
193        }
194        if let Some(builder) = builder {
195            let sst_id = self.next_sst_id(); // lock dropped here
196            let sst = Arc::new(builder.build(
197                sst_id,
198                Some(self.block_cache.clone()),
199                self.path_of_sst(sst_id),
200            )?);
201            new_sst.push(sst);
202        }
203        Ok(new_sst)
204    }
205
206    fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
207        let snapshot = {
208            let state = self.state.read();
209            state.clone()
210        };
211        match task {
212            CompactionTask::ForceFullCompaction {
213                l0_sstables,
214                l1_sstables,
215            } => {
216                let mut l0_iters = Vec::with_capacity(l0_sstables.len());
217                for id in l0_sstables.iter() {
218                    l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
219                        snapshot.sstables.get(id).unwrap().clone(),
220                    )?));
221                }
222                let mut l1_iters = Vec::with_capacity(l1_sstables.len());
223                for id in l1_sstables.iter() {
224                    l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
225                }
226                let iter = TwoMergeIterator::create(
227                    MergeIterator::create(l0_iters),
228                    SstConcatIterator::create_and_seek_to_first(l1_iters)?,
229                )?;
230                self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
231            }
232            CompactionTask::Simple(SimpleLeveledCompactionTask {
233                upper_level,
234                upper_level_sst_ids,
235                lower_level: _,
236                lower_level_sst_ids,
237                ..
238            })
239            | CompactionTask::Leveled(LeveledCompactionTask {
240                upper_level,
241                upper_level_sst_ids,
242                lower_level: _,
243                lower_level_sst_ids,
244                ..
245            }) => match upper_level {
246                Some(_) => {
247                    let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
248                    for id in upper_level_sst_ids.iter() {
249                        upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
250                    }
251                    let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
252                    let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
253                    for id in lower_level_sst_ids.iter() {
254                        lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
255                    }
256                    let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
257                    self.compact_generate_sst_from_iter(
258                        TwoMergeIterator::create(upper_iter, lower_iter)?,
259                        task.compact_to_bottom_level(),
260                    )
261                }
262                None => {
263                    let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
264                    for id in upper_level_sst_ids.iter() {
265                        upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
266                            snapshot.sstables.get(id).unwrap().clone(),
267                        )?));
268                    }
269                    let upper_iter = MergeIterator::create(upper_iters);
270                    let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
271                    for id in lower_level_sst_ids.iter() {
272                        lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
273                    }
274                    let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
275                    self.compact_generate_sst_from_iter(
276                        TwoMergeIterator::create(upper_iter, lower_iter)?,
277                        task.compact_to_bottom_level(),
278                    )
279                }
280            },
281            CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
282                let mut iters = Vec::with_capacity(tiers.len());
283                for (_, tier_sst_ids) in tiers {
284                    let mut ssts = Vec::with_capacity(tier_sst_ids.len());
285                    for id in tier_sst_ids.iter() {
286                        ssts.push(snapshot.sstables.get(id).unwrap().clone());
287                    }
288                    iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?));
289                }
290                self.compact_generate_sst_from_iter(
291                    MergeIterator::create(iters),
292                    task.compact_to_bottom_level(),
293                )
294            }
295        }
296    }
297
298    pub fn force_full_compaction(&self) -> Result<()> {
299        let CompactionOptions::NoCompaction = self.options.compaction_options else {
300            panic!("full compaction can only be called with compaction is not enabled")
301        };
302
303        let snapshot = {
304            let state = self.state.read();
305            state.clone()
306        };
307
308        let l0_sstables = snapshot.l0_sstables.clone();
309        let l1_sstables = snapshot.levels[0].1.clone();
310        let compaction_task = CompactionTask::ForceFullCompaction {
311            l0_sstables: l0_sstables.clone(),
312            l1_sstables: l1_sstables.clone(),
313        };
314
315        println!("force full compaction: {:?}", compaction_task);
316
317        let sstables = self.compact(&compaction_task)?;
318        let mut ids = Vec::with_capacity(sstables.len());
319
320        {
321            let state_lock = self.state_lock.lock();
322            let mut state = self.state.read().as_ref().clone();
323            for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
324                let result = state.sstables.remove(sst);
325                assert!(result.is_some());
326            }
327            for new_sst in sstables {
328                ids.push(new_sst.sst_id());
329                let result = state.sstables.insert(new_sst.sst_id(), new_sst);
330                assert!(result.is_none());
331            }
332            assert_eq!(l1_sstables, state.levels[0].1);
333            state.levels[0].1 = ids.clone();
334            let mut l0_sstables_map = l0_sstables.iter().copied().collect::<HashSet<_>>();
335            state.l0_sstables = state
336                .l0_sstables
337                .iter()
338                .filter(|x| !l0_sstables_map.remove(x))
339                .copied()
340                .collect::<Vec<_>>();
341            assert!(l0_sstables_map.is_empty());
342            *self.state.write() = Arc::new(state);
343            self.sync_dir()?;
344            self.manifest.as_ref().unwrap().add_record(
345                &state_lock,
346                ManifestRecord::Compaction(compaction_task, ids.clone()),
347            )?;
348        }
349        for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
350            std::fs::remove_file(self.path_of_sst(*sst))?;
351        }
352
353        println!("force full compaction done, new SSTs: {:?}", ids);
354
355        Ok(())
356    }
357
358    fn trigger_compaction(&self) -> Result<()> {
359        let snapshot = {
360            let state = self.state.read();
361            state.clone()
362        };
363        let task = self
364            .compaction_controller
365            .generate_compaction_task(&snapshot);
366        let Some(task) = task else {
367            return Ok(());
368        };
369        self.dump_structure();
370        println!("running compaction task: {:?}", task);
371        let sstables = self.compact(&task)?;
372        let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
373        let ssts_to_remove = {
374            let state_lock = self.state_lock.lock();
375            let mut snapshot = self.state.read().as_ref().clone();
376            let mut new_sst_ids = Vec::new();
377            for file_to_add in sstables {
378                new_sst_ids.push(file_to_add.sst_id());
379                let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
380                assert!(result.is_none());
381            }
382            let (mut snapshot, files_to_remove) = self
383                .compaction_controller
384                .apply_compaction_result(&snapshot, &task, &output);
385            let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
386            for file_to_remove in &files_to_remove {
387                let result = snapshot.sstables.remove(file_to_remove);
388                assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
389                ssts_to_remove.push(result.unwrap());
390            }
391            let mut state = self.state.write();
392            *state = Arc::new(snapshot);
393            drop(state);
394            self.sync_dir()?;
395            self.manifest()
396                .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
397            ssts_to_remove
398        };
399        println!(
400            "compaction finished: {} files removed, {} files added, output={:?}",
401            ssts_to_remove.len(),
402            output.len(),
403            output
404        );
405        for sst in ssts_to_remove {
406            std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;
407        }
408        self.sync_dir()?;
409
410        Ok(())
411    }
412
413    pub(crate) fn spawn_compaction_thread(
414        self: &Arc<Self>,
415        rx: crossbeam_channel::Receiver<()>,
416    ) -> Result<Option<std::thread::JoinHandle<()>>> {
417        if let CompactionOptions::Leveled(_)
418        | CompactionOptions::Simple(_)
419        | CompactionOptions::Tiered(_) = self.options.compaction_options
420        {
421            let this = self.clone();
422            let handle = std::thread::spawn(move || {
423                let ticker = crossbeam_channel::tick(Duration::from_millis(50));
424                loop {
425                    crossbeam_channel::select! {
426                        recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
427                            eprintln!("compaction failed: {}", e);
428                        },
429                        recv(rx) -> _ => return
430                    }
431                }
432            });
433            return Ok(Some(handle));
434        }
435        Ok(None)
436    }
437
438    fn trigger_flush(&self) -> Result<()> {
439        let res = {
440            let state = self.state.read();
441            state.imm_memtables.len() >= self.options.num_memtable_limit
442        };
443        if res {
444            self.force_flush_next_imm_memtable()?;
445        }
446
447        Ok(())
448    }
449
450    pub(crate) fn spawn_flush_thread(
451        self: &Arc<Self>,
452        rx: crossbeam_channel::Receiver<()>,
453    ) -> Result<Option<std::thread::JoinHandle<()>>> {
454        let this = self.clone();
455        let handle = std::thread::spawn(move || {
456            let ticker = crossbeam_channel::tick(Duration::from_millis(50));
457            loop {
458                crossbeam_channel::select! {
459                    recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
460                        eprintln!("flush failed: {}", e);
461                    },
462                    recv(rx) -> _ => return
463                }
464            }
465        });
466        Ok(Some(handle))
467    }
468}