lsm_tree/compaction/leveled.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 compaction::state::{hidden_set::HiddenSet, CompactionState},
8 config::Config,
9 segment::Segment,
10 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
11 version::{run::Ranged, Run, Version},
12 HashSet, KeyRange, SegmentId,
13};
14
15pub fn aggregate_run_key_range(segments: &[Segment]) -> KeyRange {
16 let lo = segments.first().expect("run should never be empty");
17 let hi = segments.last().expect("run should never be empty");
18 KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
19}
20
21/// Tries to find the most optimal compaction set from one level into the other.
22fn pick_minimal_compaction(
23 curr_run: &Run<Segment>,
24 next_run: Option<&Run<Segment>>,
25 hidden_set: &HiddenSet,
26 overshoot: u64,
27 segment_base_size: u64,
28) -> Option<(HashSet<SegmentId>, bool)> {
29 // NOTE: Find largest trivial move (if it exists)
30 if let Some(window) = curr_run.shrinking_windows().find(|window| {
31 if hidden_set.is_blocked(window.iter().map(Segment::id)) {
32 // IMPORTANT: Compaction is blocked because of other
33 // on-going compaction
34 return false;
35 }
36
37 let Some(next_run) = &next_run else {
38 // No run in next level, so we can trivially move
39 return true;
40 };
41
42 let key_range = aggregate_run_key_range(window);
43
44 next_run.get_overlapping(&key_range).is_empty()
45 }) {
46 let ids = window.iter().map(Segment::id).collect();
47 return Some((ids, true));
48 }
49
50 // NOTE: Look for merges
51 if let Some(next_run) = &next_run {
52 next_run
53 .growing_windows()
54 .take_while(|window| {
55 // Cap at 50x segments per compaction for now
56 //
57 // At this point, all compactions are too large anyway
58 // so we can escape early
59 let next_level_size = window.iter().map(Segment::file_size).sum::<u64>();
60 next_level_size <= (50 * segment_base_size)
61 })
62 .filter_map(|window| {
63 if hidden_set.is_blocked(window.iter().map(Segment::id)) {
64 // IMPORTANT: Compaction is blocked because of other
65 // on-going compaction
66 return None;
67 }
68
69 let key_range = aggregate_run_key_range(window);
70
71 // Pull in all contained segments in current level into compaction
72 let curr_level_pull_in = curr_run.get_contained(&key_range);
73
74 let curr_level_size = curr_level_pull_in
75 .iter()
76 .map(Segment::file_size)
77 .sum::<u64>();
78
79 // if curr_level_size < overshoot {
80 // return None;
81 // }
82
83 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Segment::id)) {
84 // IMPORTANT: Compaction is blocked because of other
85 // on-going compaction
86 return None;
87 }
88
89 let next_level_size = window.iter().map(Segment::file_size).sum::<u64>();
90
91 // let compaction_bytes = curr_level_size + next_level_size;
92
93 #[allow(clippy::cast_precision_loss)]
94 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
95
96 Some((window, curr_level_pull_in, write_amp))
97 })
98 // Find the compaction with the smallest write amplification factor
99 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
100 .map(|(window, curr_level_pull_in, _)| {
101 let mut ids: HashSet<_> = window.iter().map(Segment::id).collect();
102 ids.extend(curr_level_pull_in.iter().map(Segment::id));
103 (ids, false)
104 })
105 } else {
106 None
107 }
108}
109
110/// Levelled compaction strategy (LCS)
111///
112/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
113///
114/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` segments.
115///
116/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
117///
118/// LCS is the recommended compaction strategy to use.
119///
120/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
121#[derive(Clone)]
122pub struct Strategy {
123 /// When the number of segments in L0 reaches this threshold,
124 /// they are merged into L1.
125 ///
126 /// Default = 4
127 ///
128 /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
129 pub l0_threshold: u8,
130
131 /// The target segment size as disk (possibly compressed).
132 ///
133 /// Default = 64 MiB
134 ///
135 /// Same as `target_file_size_base` in `RocksDB`.
136 pub target_size: u32,
137
138 /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
139 ///
140 /// This is the exponential growth of the from one.
141 /// level to the next
142 ///
143 /// A level target size is: max_memtable_size * level_ratio.pow(#level + 1).
144 ///
145 /// Default = 10
146 #[allow(clippy::doc_markdown)]
147 pub level_ratio: u8,
148}
149
150impl Default for Strategy {
151 fn default() -> Self {
152 Self {
153 l0_threshold: 4,
154 target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
155 level_ratio: 10,
156 }
157 }
158}
159
160impl Strategy {
161 /// Calculates the level target size.
162 ///
163 /// L1 = `level_base_size`
164 ///
165 /// L2 = `level_base_size * ratio`
166 ///
167 /// L3 = `level_base_size * ratio * ratio`
168 ///
169 /// ...
170 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
171 assert!(
172 canonical_level_idx >= 1,
173 "level_target_size does not apply to L0",
174 );
175
176 let power = (self.level_ratio as usize).pow(u32::from(canonical_level_idx) - 1) as u64;
177
178 power * self.level_base_size()
179 }
180
181 fn level_base_size(&self) -> u64 {
182 u64::from(self.target_size) * u64::from(self.l0_threshold)
183 }
184}
185
186impl CompactionStrategy for Strategy {
187 fn get_name(&self) -> &'static str {
188 "LeveledCompaction"
189 }
190
191 #[allow(clippy::too_many_lines)]
192 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
193 assert!(version.level_count() == 7, "should have exactly 7 levels");
194
195 // Find the level that corresponds to L1
196 #[allow(clippy::map_unwrap_or)]
197 let mut canonical_l1_idx = version
198 .iter_levels()
199 .enumerate()
200 .skip(1)
201 .find(|(_, lvl)| !lvl.is_empty())
202 .map(|(idx, _)| idx)
203 .unwrap_or_else(|| version.level_count() - 1);
204
205 // Number of levels we have to shift to get from the actual level idx to the canonical
206 let mut level_shift = canonical_l1_idx - 1;
207
208 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
209 let need_new_l1 = version
210 .iter_levels()
211 .enumerate()
212 .skip(1)
213 .filter(|(_, lvl)| !lvl.is_empty())
214 .all(|(idx, level)| {
215 let level_size = level
216 .iter()
217 .flat_map(|x| x.iter())
218 // NOTE: Take bytes that are already being compacted into account,
219 // otherwise we may be overcompensating
220 .filter(|x| !state.hidden_set().is_hidden(x.id()))
221 .map(Segment::file_size)
222 .sum::<u64>();
223
224 let target_size = self.level_target_size((idx - level_shift) as u8);
225
226 level_size > target_size
227 });
228
229 // Move up L1 one level if all current levels are at capacity
230 if need_new_l1 {
231 canonical_l1_idx -= 1;
232 level_shift -= 1;
233 }
234 }
235
236 // Scoring
237 let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
238
239 {
240 // Score first level
241
242 // NOTE: We always have at least one level
243 #[allow(clippy::expect_used)]
244 let first_level = version.l0();
245
246 // TODO: use run_count instead? but be careful because of version free list GC thingy
247 if first_level.segment_count() >= usize::from(self.l0_threshold) {
248 let ratio = (first_level.segment_count() as f64) / f64::from(self.l0_threshold);
249 scores[0] = (ratio, 0);
250 }
251
252 // Score L1+
253 for (idx, level) in version.iter_levels().enumerate().skip(1) {
254 if level.is_empty() {
255 continue;
256 }
257
258 let level_size = level
259 .iter()
260 .flat_map(|x| x.iter())
261 // NOTE: Take bytes that are already being compacted into account,
262 // otherwise we may be overcompensating
263 .filter(|x| !state.hidden_set().is_hidden(x.id()))
264 .map(Segment::file_size)
265 .sum::<u64>();
266
267 let target_size = self.level_target_size((idx - level_shift) as u8);
268
269 // NOTE: We check for level length above
270 #[allow(clippy::indexing_slicing)]
271 if level_size > target_size {
272 scores[idx] = (
273 level_size as f64 / target_size as f64,
274 level_size - target_size,
275 );
276
277 // NOTE: Force a trivial move
278 if version
279 .level(idx + 1)
280 .is_some_and(|next_level| next_level.is_empty())
281 {
282 scores[idx] = (99.99, 999);
283 }
284 }
285 }
286
287 // NOTE: Never score Lmax
288 //
289 // NOTE: We check for level length above
290 #[allow(clippy::indexing_slicing)]
291 {
292 scores[6] = (0.0, 0);
293 }
294 }
295
296 // Choose compaction
297 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
298 .into_iter()
299 .enumerate()
300 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
301 score_a
302 .partial_cmp(score_b)
303 .unwrap_or(std::cmp::Ordering::Equal)
304 })
305 .expect("should have highest score somewhere");
306
307 if score < 1.0 {
308 return Choice::DoNothing;
309 }
310
311 // We choose L0->L1 compaction
312 if level_idx_with_highest_score == 0 {
313 let Some(first_level) = version.level(0) else {
314 return Choice::DoNothing;
315 };
316
317 if version.level_is_busy(0, state.hidden_set())
318 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
319 {
320 return Choice::DoNothing;
321 }
322
323 let Some(target_level) = &version.level(canonical_l1_idx) else {
324 return Choice::DoNothing;
325 };
326
327 let mut segment_ids: HashSet<u64> = first_level.list_ids();
328
329 let key_range = first_level.aggregate_key_range();
330
331 // Get overlapping segments in next level
332 let target_level_overlapping_segment_ids: Vec<_> = target_level
333 .iter()
334 .flat_map(|run| run.get_overlapping(&key_range))
335 .map(Segment::id)
336 .collect();
337
338 segment_ids.extend(&target_level_overlapping_segment_ids);
339
340 let choice = CompactionInput {
341 segment_ids,
342 dest_level: canonical_l1_idx as u8,
343 canonical_level: 1,
344 target_size: u64::from(self.target_size),
345 };
346
347 /* eprintln!(
348 "merge {} segments, L0->L1: {:?}",
349 choice.segment_ids.len(),
350 choice.segment_ids,
351 ); */
352
353 if target_level_overlapping_segment_ids.is_empty() && first_level.is_disjoint() {
354 return Choice::Move(choice);
355 }
356 return Choice::Merge(choice);
357 }
358
359 // We choose L1+ compaction instead
360
361 // NOTE: Level count is 255 max
362 #[allow(clippy::cast_possible_truncation)]
363 let curr_level_index = level_idx_with_highest_score as u8;
364
365 let next_level_index = curr_level_index + 1;
366
367 let Some(level) = version.level(level_idx_with_highest_score) else {
368 return Choice::DoNothing;
369 };
370
371 let Some(next_level) = version.level(next_level_index as usize) else {
372 return Choice::DoNothing;
373 };
374
375 debug_assert!(level.is_disjoint(), "level should be disjoint");
376 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
377
378 let Some((segment_ids, can_trivial_move)) = pick_minimal_compaction(
379 level.first_run().expect("should have exactly one run"),
380 next_level.first_run().map(std::ops::Deref::deref),
381 state.hidden_set(),
382 overshoot_bytes,
383 u64::from(self.target_size),
384 ) else {
385 return Choice::DoNothing;
386 };
387
388 let choice = CompactionInput {
389 segment_ids,
390 dest_level: next_level_index,
391 canonical_level: next_level_index - (level_shift as u8),
392 target_size: u64::from(self.target_size),
393 };
394
395 /* eprintln!(
396 "{} {} segments, L{}->L{next_level_index}: {:?}",
397 if can_trivial_move { "move" } else { "merge" },
398 choice.segment_ids.len(),
399 next_level_index - 1,
400 choice.segment_ids,
401 ); */
402
403 if can_trivial_move && level.is_disjoint() {
404 return Choice::Move(choice);
405 }
406 Choice::Merge(choice)
407 }
408}
409
410/*
411#[cfg(test)]
412mod tests {
413 use super::{Choice, Strategy};
414 use crate::{
415 cache::Cache,
416 compaction::{CompactionStrategy, Input as CompactionInput},
417 descriptor_table::FileDescriptorTable,
418 level_manifest::LevelManifest,
419 segment::{
420 block::offset::BlockOffset,
421 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
422 file_offsets::FileOffsets,
423 meta::{Metadata, SegmentId},
424 SegmentInner,
425 },
426 super_segment::Segment,
427 time::unix_timestamp,
428 Config, HashSet, KeyRange,
429 };
430 use std::{
431 path::Path,
432 sync::{atomic::AtomicBool, Arc},
433 };
434 use test_log::test;
435
436 fn string_key_range(a: &str, b: &str) -> KeyRange {
437 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
438 }
439
440 #[allow(
441 clippy::expect_used,
442 clippy::cast_possible_truncation,
443 clippy::cast_sign_loss
444 )]
445 fn fixture_segment(
446 id: SegmentId,
447 key_range: KeyRange,
448 size: u64,
449 tombstone_ratio: f32,
450 ) -> Segment {
451 todo!()
452
453 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
454
455 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
456 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
457
458 SegmentInner {
459 tree_id: 0,
460 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
461 block_index,
462
463 offsets: FileOffsets {
464 bloom_ptr: BlockOffset(0),
465 range_filter_ptr: BlockOffset(0),
466 index_block_ptr: BlockOffset(0),
467 metadata_ptr: BlockOffset(0),
468 range_tombstones_ptr: BlockOffset(0),
469 tli_ptr: BlockOffset(0),
470 pfx_ptr: BlockOffset(0),
471 },
472
473 metadata: Metadata {
474 data_block_count: 0,
475 index_block_count: 0,
476 data_block_size: 4_096,
477 index_block_size: 4_096,
478 created_at: unix_timestamp().as_nanos(),
479 id,
480 file_size: size,
481 compression: crate::segment::meta::CompressionType::None,
482 table_type: crate::segment::meta::TableType::Block,
483 item_count: 1_000_000,
484 key_count: 0,
485 key_range,
486 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
487 range_tombstone_count: 0,
488 uncompressed_size: 0,
489 seqnos: (0, 0),
490 },
491 cache,
492
493 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
494
495 path: "a".into(),
496 is_deleted: AtomicBool::default(),
497 }
498 .into() */
499 }
500
501 #[allow(clippy::expect_used)]
502 fn build_levels(
503 path: &Path,
504 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
505 ) -> crate::Result<LevelManifest> {
506 let mut levels = LevelManifest::create_new(
507 recipe.len().try_into().expect("oopsie"),
508 path.join("levels"),
509 )?;
510
511 for (idx, level) in recipe.into_iter().enumerate() {
512 for (id, min, max, size_mib) in level {
513 levels.insert_into_level(
514 idx.try_into().expect("oopsie"),
515 fixture_segment(
516 id,
517 string_key_range(min, max),
518 size_mib * 1_024 * 1_024,
519 0.0,
520 ),
521 );
522 }
523 }
524
525 Ok(levels)
526 }
527
528 #[test]
529 fn leveled_empty_levels() -> crate::Result<()> {
530 let tempdir = tempfile::tempdir()?;
531 let compactor = Strategy::default();
532
533 #[rustfmt::skip]
534 let levels = build_levels(tempdir.path(), vec![
535 vec![],
536 vec![],
537 vec![],
538 vec![],
539 ])?;
540
541 assert_eq!(
542 compactor.choose(&levels, &Config::default()),
543 Choice::DoNothing
544 );
545
546 Ok(())
547 }
548
549 #[test]
550 fn leveled_default_l0() -> crate::Result<()> {
551 let tempdir = tempfile::tempdir()?;
552 let compactor = Strategy {
553 target_size: 64 * 1_024 * 1_024,
554 ..Default::default()
555 };
556
557 #[rustfmt::skip]
558 let mut levels = build_levels(tempdir.path(), vec![
559 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
560 vec![],
561 vec![],
562 vec![],
563 ])?;
564
565 assert_eq!(
566 compactor.choose(&levels, &Config::default()),
567 Choice::Merge(CompactionInput {
568 dest_level: 1,
569 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
570 target_size: 64 * 1_024 * 1_024
571 })
572 );
573
574 levels.hide_segments(std::iter::once(4));
575
576 assert_eq!(
577 compactor.choose(&levels, &Config::default()),
578 Choice::DoNothing
579 );
580
581 Ok(())
582 }
583
584 #[test]
585 #[allow(
586 clippy::cast_sign_loss,
587 clippy::cast_precision_loss,
588 clippy::cast_possible_truncation
589 )]
590 fn leveled_intra_l0() -> crate::Result<()> {
591 let tempdir = tempfile::tempdir()?;
592 let compactor = Strategy {
593 target_size: 64 * 1_024 * 1_024,
594 ..Default::default()
595 };
596
597 #[rustfmt::skip]
598 let mut levels = build_levels(tempdir.path(), vec![
599 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
600 vec![],
601 vec![],
602 vec![],
603 ])?;
604
605 assert_eq!(
606 compactor.choose(&levels, &Config::default()),
607 Choice::Merge(CompactionInput {
608 dest_level: 0,
609 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
610 target_size: u64::from(compactor.target_size),
611 })
612 );
613
614 levels.hide_segments(std::iter::once(4));
615
616 assert_eq!(
617 compactor.choose(&levels, &Config::default()),
618 Choice::DoNothing
619 );
620
621 Ok(())
622 }
623
624 #[test]
625 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
626 let tempdir = tempfile::tempdir()?;
627 let compactor = Strategy {
628 target_size: 64 * 1_024 * 1_024,
629 ..Default::default()
630 };
631
632 #[rustfmt::skip]
633 let levels = build_levels(tempdir.path(), vec![
634 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
635 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
636 vec![],
637 vec![],
638 ])?;
639
640 assert_eq!(
641 compactor.choose(&levels, &Config::default()),
642 Choice::Merge(CompactionInput {
643 dest_level: 1,
644 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
645 target_size: 64 * 1_024 * 1_024
646 })
647 );
648
649 Ok(())
650 }
651
652 #[test]
653 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
654 let tempdir = tempfile::tempdir()?;
655 let compactor = Strategy {
656 target_size: 64 * 1_024 * 1_024,
657 ..Default::default()
658 };
659
660 #[rustfmt::skip]
661 let mut levels = build_levels(tempdir.path(), vec![
662 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
663 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
664 vec![],
665 vec![],
666 ])?;
667
668 assert_eq!(
669 compactor.choose(&levels, &Config::default()),
670 Choice::Merge(CompactionInput {
671 dest_level: 1,
672 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
673 target_size: 64 * 1_024 * 1_024
674 })
675 );
676
677 levels.hide_segments(std::iter::once(5));
678 assert_eq!(
679 compactor.choose(&levels, &Config::default()),
680 Choice::DoNothing
681 );
682
683 Ok(())
684 }
685
686 #[test]
687 fn levelled_from_tiered() -> crate::Result<()> {
688 let tempdir = tempfile::tempdir()?;
689 let compactor = Strategy {
690 target_size: 64 * 1_024 * 1_024,
691 ..Default::default()
692 };
693 let config = Config::default();
694
695 #[rustfmt::skip]
696 let levels = build_levels(tempdir.path(), vec![
697 vec![],
698 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
699 vec![(4, "a", "g", 64)],
700 vec![],
701 ])?;
702
703 assert_eq!(
704 compactor.choose(&levels, &config),
705 Choice::Merge(CompactionInput {
706 dest_level: 2,
707 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
708 target_size: 64 * 1_024 * 1_024
709 })
710 );
711
712 Ok(())
713 }
714}
715 */