1mod leveled;
2mod simple_leveled;
3mod tiered;
4
5use std::collections::HashSet;
6use std::sync::Arc;
7use std::time::Duration;
8
9use anyhow::Result;
10pub use leveled::{LeveledCompactionController, LeveledCompactionOptions, LeveledCompactionTask};
11use serde::{Deserialize, Serialize};
12pub use simple_leveled::{
13 SimpleLeveledCompactionController, SimpleLeveledCompactionOptions, SimpleLeveledCompactionTask,
14};
15pub use tiered::{TieredCompactionController, TieredCompactionOptions, TieredCompactionTask};
16
17use crate::iterators::concat_iterator::SstConcatIterator;
18use crate::iterators::merge_iterator::MergeIterator;
19use crate::iterators::two_merge_iterator::TwoMergeIterator;
20use crate::iterators::StorageIterator;
21use crate::key::KeySlice;
22use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState};
23use crate::manifest::ManifestRecord;
24use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
25
26#[derive(Debug, Serialize, Deserialize)]
27pub enum CompactionTask {
28 Leveled(LeveledCompactionTask),
29 Tiered(TieredCompactionTask),
30 Simple(SimpleLeveledCompactionTask),
31 ForceFullCompaction {
32 l0_sstables: Vec<usize>,
33 l1_sstables: Vec<usize>,
34 },
35}
36
37impl CompactionTask {
38 fn compact_to_bottom_level(&self) -> bool {
39 match self {
40 CompactionTask::ForceFullCompaction { .. } => true,
41 CompactionTask::Leveled(task) => task.is_lower_level_bottom_level,
42 CompactionTask::Simple(task) => task.is_lower_level_bottom_level,
43 CompactionTask::Tiered(task) => task.bottom_tier_included,
44 }
45 }
46}
47
48pub(crate) enum CompactionController {
49 Leveled(LeveledCompactionController),
50 Tiered(TieredCompactionController),
51 Simple(SimpleLeveledCompactionController),
52 NoCompaction,
53}
54
55impl CompactionController {
56 pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
57 match self {
58 CompactionController::Leveled(ctrl) => ctrl
59 .generate_compaction_task(snapshot)
60 .map(CompactionTask::Leveled),
61 CompactionController::Simple(ctrl) => ctrl
62 .generate_compaction_task(snapshot)
63 .map(CompactionTask::Simple),
64 CompactionController::Tiered(ctrl) => ctrl
65 .generate_compaction_task(snapshot)
66 .map(CompactionTask::Tiered),
67 CompactionController::NoCompaction => unreachable!(),
68 }
69 }
70
71 pub fn apply_compaction_result(
72 &self,
73 snapshot: &LsmStorageState,
74 task: &CompactionTask,
75 output: &[usize],
76 ) -> (LsmStorageState, Vec<usize>) {
77 match (self, task) {
78 (CompactionController::Leveled(ctrl), CompactionTask::Leveled(task)) => {
79 ctrl.apply_compaction_result(snapshot, task, output)
80 }
81 (CompactionController::Simple(ctrl), CompactionTask::Simple(task)) => {
82 ctrl.apply_compaction_result(snapshot, task, output)
83 }
84 (CompactionController::Tiered(ctrl), CompactionTask::Tiered(task)) => {
85 ctrl.apply_compaction_result(snapshot, task, output)
86 }
87 _ => unreachable!(),
88 }
89 }
90}
91
92impl CompactionController {
93 pub fn flush_to_l0(&self) -> bool {
94 matches!(
95 self,
96 Self::Leveled(_) | Self::Simple(_) | Self::NoCompaction
97 )
98 }
99}
100
101#[derive(Debug, Clone)]
102pub enum CompactionOptions {
103 Leveled(LeveledCompactionOptions),
106 Tiered(TieredCompactionOptions),
108 Simple(SimpleLeveledCompactionOptions),
110 NoCompaction,
112}
113
114impl LsmStorageInner {
115 fn compact_generate_sst_from_iter(
116 &self,
117 mut iter: impl for<'a> StorageIterator<KeyType<'a> = KeySlice<'a>>,
118 compact_to_bottom_level: bool,
119 ) -> Result<Vec<Arc<SsTable>>> {
120 let mut builder = None;
121 let mut new_sst = Vec::new();
122 let watermark = self.mvcc().watermark();
123 let mut last_key = Vec::<u8>::new();
124 let mut first_key_below_watermark = false;
125 let compaction_filters = self.compaction_filters.lock().clone();
126 'outer: while iter.is_valid() {
127 if builder.is_none() {
128 builder = Some(SsTableBuilder::new(self.options.block_size));
129 }
130
131 let same_as_last_key = iter.key().key_ref() == last_key;
132 if !same_as_last_key {
133 first_key_below_watermark = true;
134 }
135
136 if compact_to_bottom_level
137 && !same_as_last_key
138 && iter.key().ts() <= watermark
139 && iter.value().is_empty()
140 {
141 last_key.clear();
142 last_key.extend(iter.key().key_ref());
143 iter.next()?;
144 first_key_below_watermark = false;
145 continue;
146 }
147
148 if iter.key().ts() <= watermark {
149 if same_as_last_key && !first_key_below_watermark {
150 iter.next()?;
151 continue;
152 }
153
154 first_key_below_watermark = false;
155
156 if !compaction_filters.is_empty() {
157 for filter in &compaction_filters {
158 match filter {
159 CompactionFilter::Prefix(x) => {
160 if iter.key().key_ref().starts_with(x) {
161 iter.next()?;
162 continue 'outer;
163 }
164 }
165 }
166 }
167 }
168 }
169
170 let builder_inner = builder.as_mut().unwrap();
171
172 if builder_inner.estimated_size() >= self.options.target_sst_size && !same_as_last_key {
173 let sst_id = self.next_sst_id();
174 let old_builder = builder.take().unwrap();
175 let sst = Arc::new(old_builder.build(
176 sst_id,
177 Some(self.block_cache.clone()),
178 self.path_of_sst(sst_id),
179 )?);
180 new_sst.push(sst);
181 builder = Some(SsTableBuilder::new(self.options.block_size));
182 }
183
184 let builder_inner = builder.as_mut().unwrap();
185 builder_inner.add(iter.key(), iter.value());
186
187 if !same_as_last_key {
188 last_key.clear();
189 last_key.extend(iter.key().key_ref());
190 }
191
192 iter.next()?;
193 }
194 if let Some(builder) = builder {
195 let sst_id = self.next_sst_id(); let sst = Arc::new(builder.build(
197 sst_id,
198 Some(self.block_cache.clone()),
199 self.path_of_sst(sst_id),
200 )?);
201 new_sst.push(sst);
202 }
203 Ok(new_sst)
204 }
205
206 fn compact(&self, task: &CompactionTask) -> Result<Vec<Arc<SsTable>>> {
207 let snapshot = {
208 let state = self.state.read();
209 state.clone()
210 };
211 match task {
212 CompactionTask::ForceFullCompaction {
213 l0_sstables,
214 l1_sstables,
215 } => {
216 let mut l0_iters = Vec::with_capacity(l0_sstables.len());
217 for id in l0_sstables.iter() {
218 l0_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
219 snapshot.sstables.get(id).unwrap().clone(),
220 )?));
221 }
222 let mut l1_iters = Vec::with_capacity(l1_sstables.len());
223 for id in l1_sstables.iter() {
224 l1_iters.push(snapshot.sstables.get(id).unwrap().clone());
225 }
226 let iter = TwoMergeIterator::create(
227 MergeIterator::create(l0_iters),
228 SstConcatIterator::create_and_seek_to_first(l1_iters)?,
229 )?;
230 self.compact_generate_sst_from_iter(iter, task.compact_to_bottom_level())
231 }
232 CompactionTask::Simple(SimpleLeveledCompactionTask {
233 upper_level,
234 upper_level_sst_ids,
235 lower_level: _,
236 lower_level_sst_ids,
237 ..
238 })
239 | CompactionTask::Leveled(LeveledCompactionTask {
240 upper_level,
241 upper_level_sst_ids,
242 lower_level: _,
243 lower_level_sst_ids,
244 ..
245 }) => match upper_level {
246 Some(_) => {
247 let mut upper_ssts = Vec::with_capacity(upper_level_sst_ids.len());
248 for id in upper_level_sst_ids.iter() {
249 upper_ssts.push(snapshot.sstables.get(id).unwrap().clone());
250 }
251 let upper_iter = SstConcatIterator::create_and_seek_to_first(upper_ssts)?;
252 let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
253 for id in lower_level_sst_ids.iter() {
254 lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
255 }
256 let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
257 self.compact_generate_sst_from_iter(
258 TwoMergeIterator::create(upper_iter, lower_iter)?,
259 task.compact_to_bottom_level(),
260 )
261 }
262 None => {
263 let mut upper_iters = Vec::with_capacity(upper_level_sst_ids.len());
264 for id in upper_level_sst_ids.iter() {
265 upper_iters.push(Box::new(SsTableIterator::create_and_seek_to_first(
266 snapshot.sstables.get(id).unwrap().clone(),
267 )?));
268 }
269 let upper_iter = MergeIterator::create(upper_iters);
270 let mut lower_ssts = Vec::with_capacity(upper_level_sst_ids.len());
271 for id in lower_level_sst_ids.iter() {
272 lower_ssts.push(snapshot.sstables.get(id).unwrap().clone());
273 }
274 let lower_iter = SstConcatIterator::create_and_seek_to_first(lower_ssts)?;
275 self.compact_generate_sst_from_iter(
276 TwoMergeIterator::create(upper_iter, lower_iter)?,
277 task.compact_to_bottom_level(),
278 )
279 }
280 },
281 CompactionTask::Tiered(TieredCompactionTask { tiers, .. }) => {
282 let mut iters = Vec::with_capacity(tiers.len());
283 for (_, tier_sst_ids) in tiers {
284 let mut ssts = Vec::with_capacity(tier_sst_ids.len());
285 for id in tier_sst_ids.iter() {
286 ssts.push(snapshot.sstables.get(id).unwrap().clone());
287 }
288 iters.push(Box::new(SstConcatIterator::create_and_seek_to_first(ssts)?));
289 }
290 self.compact_generate_sst_from_iter(
291 MergeIterator::create(iters),
292 task.compact_to_bottom_level(),
293 )
294 }
295 }
296 }
297
298 pub fn force_full_compaction(&self) -> Result<()> {
299 let CompactionOptions::NoCompaction = self.options.compaction_options else {
300 panic!("full compaction can only be called with compaction is not enabled")
301 };
302
303 let snapshot = {
304 let state = self.state.read();
305 state.clone()
306 };
307
308 let l0_sstables = snapshot.l0_sstables.clone();
309 let l1_sstables = snapshot.levels[0].1.clone();
310 let compaction_task = CompactionTask::ForceFullCompaction {
311 l0_sstables: l0_sstables.clone(),
312 l1_sstables: l1_sstables.clone(),
313 };
314
315 println!("force full compaction: {:?}", compaction_task);
316
317 let sstables = self.compact(&compaction_task)?;
318 let mut ids = Vec::with_capacity(sstables.len());
319
320 {
321 let state_lock = self.state_lock.lock();
322 let mut state = self.state.read().as_ref().clone();
323 for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
324 let result = state.sstables.remove(sst);
325 assert!(result.is_some());
326 }
327 for new_sst in sstables {
328 ids.push(new_sst.sst_id());
329 let result = state.sstables.insert(new_sst.sst_id(), new_sst);
330 assert!(result.is_none());
331 }
332 assert_eq!(l1_sstables, state.levels[0].1);
333 state.levels[0].1 = ids.clone();
334 let mut l0_sstables_map = l0_sstables.iter().copied().collect::<HashSet<_>>();
335 state.l0_sstables = state
336 .l0_sstables
337 .iter()
338 .filter(|x| !l0_sstables_map.remove(x))
339 .copied()
340 .collect::<Vec<_>>();
341 assert!(l0_sstables_map.is_empty());
342 *self.state.write() = Arc::new(state);
343 self.sync_dir()?;
344 self.manifest.as_ref().unwrap().add_record(
345 &state_lock,
346 ManifestRecord::Compaction(compaction_task, ids.clone()),
347 )?;
348 }
349 for sst in l0_sstables.iter().chain(l1_sstables.iter()) {
350 std::fs::remove_file(self.path_of_sst(*sst))?;
351 }
352
353 println!("force full compaction done, new SSTs: {:?}", ids);
354
355 Ok(())
356 }
357
358 fn trigger_compaction(&self) -> Result<()> {
359 let snapshot = {
360 let state = self.state.read();
361 state.clone()
362 };
363 let task = self
364 .compaction_controller
365 .generate_compaction_task(&snapshot);
366 let Some(task) = task else {
367 return Ok(());
368 };
369 self.dump_structure();
370 println!("running compaction task: {:?}", task);
371 let sstables = self.compact(&task)?;
372 let output = sstables.iter().map(|x| x.sst_id()).collect::<Vec<_>>();
373 let ssts_to_remove = {
374 let state_lock = self.state_lock.lock();
375 let mut snapshot = self.state.read().as_ref().clone();
376 let mut new_sst_ids = Vec::new();
377 for file_to_add in sstables {
378 new_sst_ids.push(file_to_add.sst_id());
379 let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
380 assert!(result.is_none());
381 }
382 let (mut snapshot, files_to_remove) = self
383 .compaction_controller
384 .apply_compaction_result(&snapshot, &task, &output);
385 let mut ssts_to_remove = Vec::with_capacity(files_to_remove.len());
386 for file_to_remove in &files_to_remove {
387 let result = snapshot.sstables.remove(file_to_remove);
388 assert!(result.is_some(), "cannot remove {}.sst", file_to_remove);
389 ssts_to_remove.push(result.unwrap());
390 }
391 let mut state = self.state.write();
392 *state = Arc::new(snapshot);
393 drop(state);
394 self.sync_dir()?;
395 self.manifest()
396 .add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
397 ssts_to_remove
398 };
399 println!(
400 "compaction finished: {} files removed, {} files added, output={:?}",
401 ssts_to_remove.len(),
402 output.len(),
403 output
404 );
405 for sst in ssts_to_remove {
406 std::fs::remove_file(self.path_of_sst(sst.sst_id()))?;
407 }
408 self.sync_dir()?;
409
410 Ok(())
411 }
412
413 pub(crate) fn spawn_compaction_thread(
414 self: &Arc<Self>,
415 rx: crossbeam_channel::Receiver<()>,
416 ) -> Result<Option<std::thread::JoinHandle<()>>> {
417 if let CompactionOptions::Leveled(_)
418 | CompactionOptions::Simple(_)
419 | CompactionOptions::Tiered(_) = self.options.compaction_options
420 {
421 let this = self.clone();
422 let handle = std::thread::spawn(move || {
423 let ticker = crossbeam_channel::tick(Duration::from_millis(50));
424 loop {
425 crossbeam_channel::select! {
426 recv(ticker) -> _ => if let Err(e) = this.trigger_compaction() {
427 eprintln!("compaction failed: {}", e);
428 },
429 recv(rx) -> _ => return
430 }
431 }
432 });
433 return Ok(Some(handle));
434 }
435 Ok(None)
436 }
437
438 fn trigger_flush(&self) -> Result<()> {
439 let res = {
440 let state = self.state.read();
441 state.imm_memtables.len() >= self.options.num_memtable_limit
442 };
443 if res {
444 self.force_flush_next_imm_memtable()?;
445 }
446
447 Ok(())
448 }
449
450 pub(crate) fn spawn_flush_thread(
451 self: &Arc<Self>,
452 rx: crossbeam_channel::Receiver<()>,
453 ) -> Result<Option<std::thread::JoinHandle<()>>> {
454 let this = self.clone();
455 let handle = std::thread::spawn(move || {
456 let ticker = crossbeam_channel::tick(Duration::from_millis(50));
457 loop {
458 crossbeam_channel::select! {
459 recv(ticker) -> _ => if let Err(e) = this.trigger_flush() {
460 eprintln!("flush failed: {}", e);
461 },
462 recv(rx) -> _ => return
463 }
464 }
465 });
466 Ok(Some(handle))
467 }
468}