1mod leveled;
2mod simple_leveled;
3mod tiered;
4
5use std::collections::HashSet;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
11use serde::{Deserialize, Serialize};
12pub use simple_leveled::{
13 SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
14};
15pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
16
17use crate::iterators::concat_iterator::SstConcatIterator;
18use crate::iterators::merge_iterator::MergeIterator;
19use crate::iterators::two_merge_iterator::TwoMergeIterator;
20use crate::iterators::StorageIterator;
21use crate::key::KeySlice;
22use crate::lsm_storage::{LsmStorageInner, LsmStorageState};
23use crate::manifest::ManifestRecord;
24use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
25
26#[derive(Debug, Serialize, Deserialize)]
27pub enum CompactionTask {
28 Leveled(LeveledCompactionTask),
29 Tiered(TieredCompactionTask),
30 Simple(SimpleLeveledCompactionTask),
31 ForceFullCompaction {
32 l0_sstables: Vec<usize>,
33 l1_sstables: Vec<usize>,
34 },
35}
36
37impl CompactionTask {
38 fn compact_to_bottom_level(&self) -> bool {
39 match self {
40 CompactionTask::ForceFullCompaction { .. } => true,
41 CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
42 CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
43 CompactionTask::Tiered(task) => task.bottom_tier_included,
44 }
45 }
46}
47
48pub(crate) enum CompactionController {
49 Leveled(LeveledCompactionController),
50 Tiered(TieredCompactionController),
51 Simple(SimpleLeveledCompactionController),
52 NoCompaction,
53}
54
55impl CompactionController {
56 pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
57 match self {
58 CompactionController::Leveled(ctrl) => ctrl
59 .generate_compaction_task(snapshot)
60 .map(CompactionTask::Leveled),
61 CompactionController::Simple(ctrl) => ctrl
62 .generate_compaction_task(snapshot)
63 .map(CompactionTask::Simple),
64 CompactionController::Tiered(ctrl) => ctrl
65 .generate_compaction_task(snapshot)
66 .map(CompactionTask::Tiered),
67 CompactionController::NoCompaction => unreachable!(),
68 }
69 }
70
71 pub fn apply_compaction_result(
72 &self,
73 snapshot: &LsmStorageState,
74 task: &CompactionTask,
75 output: &[usize],
76 ) -> (LsmStorageState, Vec<usize>) {
77 match (self, task) {
78 (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
79 ctrl.apply_compaction_result(snapshot, task, output)
80 }
81 (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
82 ctrl.apply_compaction_result(snapshot, task, output)
83 }
84 (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
85 ctrl.apply_compaction_result(snapshot, task, output)
86 }
87 _ => unreachable!(),
88 }
89 }
90}
91
92impl CompactionController {
93 pub fn flush_to_l0(&self) -> bool {
94 matches!(
95 self,
96 Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction
97 )
98 }
99}
100
101#[derive(Debug, Clone)]
102pub enum CompactionOptions {
103 Leveled(LeveledCompactionOptions),
106 Tiered(TieredCompactionOptions),
108 Simple(SimpleLeveledCompactionOptions),
110 NoCompaction,
112}
113
114impl LsmStorageInner {
115 fn compact_generate_sst_from_iter(
116 &self,
117 mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
118 compact_to_bottom_level: bool,
119 ) -> Result<Vec<Arc<SsTable>>> {
120 let mut builder = None;
121 let mut new_sst = Vec::new();
122
123 while iter.is_valid() {
124 if builder.is_none() {
125 builder = Some(SsTableBuilder::new(self.options.block_size));
126 }
127 let builder_inner = builder.as_mut().unwrap();
128 if compact_to_bottom_level {
129 if !iter.value().is_empty() {
130 builder_inner.add(iter.key(), iter.value());
131 }
132 } else {
133 builder_inner.add(iter.key(), iter.value());
134 }
135 iter.next()?;
136
137 if builder_inner.estimated_size() >= self.options.target_sst_size {
138 let sst_id = self.next_sst_id();
139 let builder = builder.take().unwrap();
140 let sst = Arc::new(builder.build(
141 sst_id,
142 Some(self.block_cache.clone()),
143 self.path_of_sst(sst_id),
144 )?);
145 new_sst.push(sst);
146 }
147 }
148 if let Some(builder) = builder {
149 let sst_id = self.next_sst_id(); let sst = Arc::new(builder.build(
151 sst_id,
152 Some(self.block_cache.clone()),
153 self.path_of_sst(sst_id),
154 )?);
155 new_sst.push(sst);
156 }
157 Ok(new_sst)
158 }
159
160 fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
161 let snapshot = {
162 let state = self.state.read();
163 state.clone()
164 };
165 match task {
166 CompactionTask::ForceFullCompaction {
167 l0_sstables,
168 l1_sstables,
169 } => {
170 let mut l0_iters = Vec::with_capacity(l0_sstables.len());
171 for id in l0_sstables.iter() {
172 l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
173 snapshot.sstables.get(id).unwrap().clone(),
174 )?));
175 }
176 let mut l1_iters = Vec::with_capacity(l1_sstables.len());
177 for id in l1_sstables.iter() {
178 l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
179 }
180 let iter = TwoMergeIterator::create(
181 MergeIterator::create(l0_iters),
182 SstConcatIterator::create_and_seek_to_first(l1_iters)?,
183 )?;
184 self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
185 }
186 CompactionTask::Simple(SimpleLeveledCompactionTask {
187 upper_level,
188 upper_level_sst_ids,
189 lower_level: _,
190 lower_level_sst_ids,
191 ..
192 })
193 | CompactionTask::Leveled(LeveledCompactionTask {
194 upper_level,
195 upper_level_sst_ids,
196 lower_level: _,
197 lower_level_sst_ids,
198 ..
199 }) => match upper_level {
200 Some(_) => {
201 let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
202 for id in upper_level_sst_ids.iter() {
203 upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
204 }
205 let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
206 let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
207 for id in lower_level_sst_ids.iter() {
208 lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
209 }
210 let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
211 self.compact_generate_sst_from_iter(
212 TwoMergeIterator::create(upper_iter, lower_iter)?,
213 task.compact_to_bottom_level(),
214 )
215 }
216 None => {
217 let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
218 for id in upper_level_sst_ids.iter() {
219 upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
220 snapshot.sstables.get(id).unwrap().clone(),
221 )?));
222 }
223 let upper_iter = MergeIterator::create(upper_iters);
224 let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
225 for id in lower_level_sst_ids.iter() {
226 lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
227 }
228 let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
229 self.compact_generate_sst_from_iter(
230 TwoMergeIterator::create(upper_iter, lower_iter)?,
231 task.compact_to_bottom_level(),
232 )
233 }
234 },
235 CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
236 let mut iters = Vec::with_capacity(tiers.len());
237 for (_, tier_sst_ids) in tiers {
238 let mut ssts = Vec::with_capacity(tier_sst_ids.len());
239 for id in tier_sst_ids.iter() {
240 ssts.push(snapshot.sstables.get(id).unwrap().clone());
241 }
242 iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?));
243 }
244 self.compact_generate_sst_from_iter(
245 MergeIterator::create(iters),
246 task.compact_to_bottom_level(),
247 )
248 }
249 }
250 }
251
252 pub fn force_full_compaction(&self) -> Result<()> {
253 let CompactionOptions::NoCompaction = self.options.compaction_options else {
254 panic!("full compaction can only be called with compaction is not enabled")
255 };
256
257 let snapshot = {
258 let state = self.state.read();
259 state.clone()
260 };
261
262 let l0_sstables = snapshot.l0_sstables.clone();
263 let l1_sstables = snapshot.levels[0].1.clone();
264 let compaction_task = CompactionTask::ForceFullCompaction {
265 l0_sstables: l0_sstables.clone(),
266 l1_sstables: l1_sstables.clone(),
267 };
268
269 println!("force full compaction: {:?}", compaction_task);
270
271 let sstables = self.compact(&compaction_task)?;
272 let mut ids = Vec::with_capacity(sstables.len());
273
274 {
275 let state_lock = self.state_lock.lock();
276 let mut state = self.state.read().as_ref().clone();
277 for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
278 let result = state.sstables.remove(sst);
279 assert!(result.is_some());
280 }
281 for new_sst in sstables {
282 ids.push(new_sst.sst_id());
283 let result = state.sstables.insert(new_sst.sst_id(), new_sst);
284 assert!(result.is_none());
285 }
286 assert_eq!(l1_sstables, state.levels[0].1);
287 state.levels[0].1 = ids.clone();
288 let mut l0_sstables_map = l0_sstables.iter().copied().collect::<HashSet<_>>();
289 state.l0_sstables = state
290 .l0_sstables
291 .iter()
292 .filter(|x| !l0_sstables_map.remove(x))
293 .copied()
294 .collect::<Vec<_>>();
295 assert!(l0_sstables_map.is_empty());
296 *self.state.write() = Arc::new(state);
297 self.sync_dir()?;
298 self.manifest.as_ref().unwrap().add_record(
299 &state_lock,
300 ManifestRecord::Compaction(compaction_task, ids.clone()),
301 )?;
302 }
303 for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
304 std::fs::remove_file(self.path_of_sst(*sst))?;
305 }
306
307 println!("force full compaction done, new SSTs: {:?}", ids);
308
309 Ok(())
310 }
311
312 fn trigger_compaction(&self) -> Result<()> {
313 let snapshot = {
314 let state = self.state.read();
315 state.clone()
316 };
317 let task = self
318 .compaction_controller
319 .generate_compaction_task(&snapshot);
320 let Some(task) = task else {
321 return Ok(());
322 };
323 self.dump_structure();
324 println!("running compaction task: {:?}", task);
325 let sstables = self.compact(&task)?;
326 let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
327 let ssts_to_remove = {
328 let state_lock = self.state_lock.lock();
329 let mut snapshot = self.state.read().as_ref().clone();
330 let mut new_sst_ids = Vec::new();
331 for file_to_add in sstables {
332 new_sst_ids.push(file_to_add.sst_id());
333 let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
334 assert!(result.is_none());
335 }
336 let (mut snapshot, files_to_remove) = self
337 .compaction_controller
338 .apply_compaction_result(&snapshot, &task, &output);
339 let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
340 for file_to_remove in &files_to_remove {
341 let result = snapshot.sstables.remove(file_to_remove);
342 assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
343 ssts_to_remove.push(result.unwrap());
344 }
345 let mut state = self.state.write();
346 *state = Arc::new(snapshot);
347 drop(state);
348 self.sync_dir()?;
349 self.manifest
350 .as_ref()
351 .unwrap()
352 .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
353 ssts_to_remove
354 };
355 println!(
356 "compaction finished: {} files removed, {} files added, output={:?}",
357 ssts_to_remove.len(),
358 output.len(),
359 output
360 );
361 for sst in ssts_to_remove {
362 std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;
363 }
364 self.sync_dir()?;
365
366 Ok(())
367 }
368
369 pub(crate) fn spawn_compaction_thread(
370 self: &Arc<Self>,
371 rx: crossbeam_channel::Receiver<()>,
372 ) -> Result<Option<std::thread::JoinHandle<()>>> {
373 if let CompactionOptions::Leveled(_)
374 | CompactionOptions::Simple(_)
375 | CompactionOptions::Tiered(_) = self.options.compaction_options
376 {
377 let this = self.clone();
378 let handle = std::thread::spawn(move || {
379 let ticker = crossbeam_channel::tick(Duration::from_millis(50));
380 loop {
381 crossbeam_channel::select! {
382 recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
383 eprintln!("compaction failed: {}", e);
384 },
385 recv(rx) -> _ => return
386 }
387 }
388 });
389 return Ok(Some(handle));
390 }
391 Ok(None)
392 }
393
394 fn trigger_flush(&self) -> Result<()> {
395 let res = {
396 let state = self.state.read();
397 state.imm_memtables.len() >= self.options.num_memtable_limit
398 };
399 if res {
400 self.force_flush_next_imm_memtable()?;
401 }
402
403 Ok(())
404 }
405
406 pub(crate) fn spawn_flush_thread(
407 self: &Arc<Self>,
408 rx: crossbeam_channel::Receiver<()>,
409 ) -> Result<Option<std::thread::JoinHandle<()>>> {
410 let this = self.clone();
411 let handle = std::thread::spawn(move || {
412 let ticker = crossbeam_channel::tick(Duration::from_millis(50));
413 loop {
414 crossbeam_channel::select! {
415 recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
416 eprintln!("flush failed: {}", e);
417 },
418 recv(rx) -> _ => return
419 }
420 }
421 });
422 Ok(Some(handle))
423 }
424}