lsm_tree/compaction/
leveled.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    config::Config,
8    level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
9    segment::Segment,
10    windows::{GrowingWindowsExt, ShrinkingWindowsExt},
11    HashSet, KeyRange, SegmentId,
12};
13
14// TODO: for a disjoint set of segments, we could just take the first and last segment and use their first and last key respectively
15/// Aggregates the key range of a list of segments.
16fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
17    KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
18}
19
20/// Tries to find the most optimal compaction set from
21/// one level into the other.
22fn pick_minimal_compaction(
23    curr_level: &Level,
24    next_level: &Level,
25    hidden_set: &HiddenSet,
26) -> Option<(HashSet<SegmentId>, bool)> {
27    // assert!(curr_level.is_disjoint, "Lx is not disjoint");
28    // assert!(next_level.is_disjoint, "Lx+1 is not disjoint");
29
30    struct Choice {
31        write_amp: f32,
32        segment_ids: HashSet<SegmentId>,
33        can_trivial_move: bool,
34    }
35
36    let mut choices = vec![];
37
38    let mut add_choice = |choice: Choice| {
39        let mut valid_choice = true;
40
41        // IMPORTANT: Compaction is blocked because of other
42        // on-going compaction
43        valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));
44
45        // NOTE: Keep compactions with 25 or less segments
46        // to make compactions not too large
47        valid_choice &= choice.can_trivial_move || choice.segment_ids.len() <= 25;
48
49        if valid_choice {
50            choices.push(choice);
51        }
52    };
53
54    for window in next_level.growing_windows() {
55        if hidden_set.is_blocked(window.iter().map(Segment::id)) {
56            // IMPORTANT: Compaction is blocked because of other
57            // on-going compaction
58            continue;
59        }
60
61        let key_range = aggregate_key_range(window);
62
63        // Pull in all segments in current level into compaction
64        let curr_level_pull_in: Vec<_> = if curr_level.is_disjoint {
65            // IMPORTANT: Avoid "infectious spread" of key ranges
66            // Imagine these levels:
67            //
68            //      A     B     C     D     E     F
69            // L1 | ----- ----- ----- ----- ----- -----
70            // L2 |    -----  -----  ----- ----- -----
71            //         1      2      3     4     5
72            //
73            // If we took 1, we would also have to include B,
74            // but then we would also have to include 2,
75            // but then we would also have to include C,
76            // but then we would also have to include 3,
77            // ...
78            //
79            // Instead, we consider a window like 1 - 3
80            // and then take B & C, because they are *contained* in that range
81            // Not including A or D is fine, because we are not shadowing data unexpectedly
82            curr_level.contained_segments(&key_range).collect()
83        } else {
84            // If the level is not disjoint, we just merge everything that overlaps
85            // to try and "repair" the level
86            curr_level.overlapping_segments(&key_range).collect()
87        };
88
89        if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
90            // IMPORTANT: Compaction is blocked because of other
91            // on-going compaction
92            continue;
93        }
94
95        let curr_level_size = curr_level_pull_in
96            .iter()
97            .map(|x| x.metadata.file_size)
98            .sum::<u64>();
99
100        // NOTE: Only consider compactions where we actually reach the amount
101        // of bytes we need to merge
102        if curr_level_size >= 1 {
103            let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
104
105            let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
106            segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));
107
108            let write_amp = (next_level_size as f32) / (curr_level_size as f32);
109
110            add_choice(Choice {
111                write_amp,
112                segment_ids,
113                can_trivial_move: false,
114            });
115        }
116    }
117
118    // NOTE: Find largest trivial move (if it exists)
119    for window in curr_level.shrinking_windows() {
120        let key_range = aggregate_key_range(window);
121
122        if next_level.overlapping_segments(&key_range).next().is_none() {
123            add_choice(Choice {
124                write_amp: 0.0,
125                segment_ids: window.iter().map(Segment::id).collect(),
126                can_trivial_move: true,
127            });
128            break;
129        }
130    }
131
132    let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
133        a.write_amp
134            .partial_cmp(&b.write_amp)
135            .unwrap_or(std::cmp::Ordering::Equal)
136    });
137
138    minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
139}
140
141/// Levelled compaction strategy (LCS)
142///
143/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
144///
145/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^n` segments.
146///
147/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
148///
149/// LCS is the recommended compaction strategy to use.
150///
151/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
152#[derive(Clone)]
153pub struct Strategy {
154    /// When the number of segments in L0 reaches this threshold,
155    /// they are merged into L1.
156    ///
157    /// Default = 4
158    ///
159    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
160    pub l0_threshold: u8,
161
162    /// The target segment size as disk (possibly compressed).
163    ///
164    /// Default = 64 MiB
165    ///
166    /// Same as `target_file_size_base` in `RocksDB`.
167    pub target_size: u32,
168
169    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
170    ///
171    /// This is the exponential growth of the from one.
172    /// level to the next
173    ///
174    /// A level target size is: max_memtable_size * level_ratio.pow(#level + 1).
175    #[allow(clippy::doc_markdown)]
176    pub level_ratio: u8,
177}
178
179impl Default for Strategy {
180    fn default() -> Self {
181        Self {
182            l0_threshold: 4,
183            target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
184            level_ratio: 10,
185        }
186    }
187}
188
189impl Strategy {
190    /// Calculates the level target size.
191    ///
192    /// L1 = `level_base_size`
193    ///
194    /// L2 = `level_base_size * ratio`
195    ///
196    /// L3 = `level_base_size * ratio * ratio`
197    /// ...
198    fn level_target_size(&self, level_idx: u8) -> u64 {
199        assert!(level_idx >= 1, "level_target_size does not apply to L0");
200
201        let power = (self.level_ratio as usize).pow(u32::from(level_idx) - 1);
202
203        (power * (self.level_base_size() as usize)) as u64
204    }
205
206    fn level_base_size(&self) -> u64 {
207        u64::from(self.target_size) * u64::from(self.l0_threshold)
208    }
209}
210
211impl CompactionStrategy for Strategy {
212    fn get_name(&self) -> &'static str {
213        "LeveledStrategy"
214    }
215
216    #[allow(clippy::too_many_lines)]
217    fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
218        let view = &levels.levels;
219
220        // TODO: look at L1+, if not disjoint
221        // TODO: try to repairing level by rewriting
222        // TODO: abort if any segment is hidden
223        // TODO: then make sure, non-disjoint levels cannot be used in subsequent code below
224        // TODO: add tests
225
226        // L1+ compactions
227        for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
228        {
229            // NOTE: Level count is 255 max
230            #[allow(clippy::cast_possible_truncation)]
231            let curr_level_index = curr_level_index as u8;
232
233            let next_level_index = curr_level_index + 1;
234
235            if level.is_empty() {
236                continue;
237            }
238
239            let level_size: u64 = level
240                .segments
241                .iter()
242                // NOTE: Take bytes that are already being compacted into account,
243                // otherwise we may be overcompensating
244                .filter(|x| !levels.hidden_set().is_hidden(x.id()))
245                .map(|x| x.metadata.file_size)
246                .sum();
247
248            let desired_bytes = self.level_target_size(curr_level_index);
249
250            let overshoot = level_size.saturating_sub(desired_bytes);
251
252            if overshoot > 0 {
253                let Some(next_level) = &view.get(next_level_index as usize) else {
254                    break;
255                };
256
257                let Some((segment_ids, can_trivial_move)) =
258                    pick_minimal_compaction(level, next_level, levels.hidden_set())
259                else {
260                    break;
261                };
262
263                // eprintln!(
264                //     "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}",
265                //     segment_ids.len(),
266                //     next_level_index - 1,
267                // );
268
269                let choice = CompactionInput {
270                    segment_ids,
271                    dest_level: next_level_index,
272                    target_size: u64::from(self.target_size),
273                };
274
275                // TODO: eventually, this should happen lazily
276                // if a segment file lives for very long, it should get rewritten
277                // Rocks, by default, rewrites files that are 1 month or older
278                //
279                // TODO: 3.0.0 configuration?
280                // NOTE: We purposefully not trivially move segments
281                // if we go from L1 to L2
282                // https://github.com/fjall-rs/lsm-tree/issues/63
283                let goes_into_cold_storage = next_level_index == 2;
284
285                if goes_into_cold_storage {
286                    return Choice::Merge(choice);
287                }
288
289                if can_trivial_move && level.is_disjoint {
290                    return Choice::Move(choice);
291                }
292                return Choice::Merge(choice);
293            }
294        }
295
296        // L0->L1 compactions
297        {
298            let busy_levels = levels.busy_levels();
299
300            let Some(first_level) = view.first() else {
301                return Choice::DoNothing;
302            };
303
304            if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
305                let first_level_size = first_level.size();
306
307                // NOTE: Special handling for disjoint workloads
308                if levels.is_disjoint() {
309                    if first_level_size < self.target_size.into() {
310                        // TODO: also do this in non-disjoint workloads
311                        // -> intra-L0 compaction
312
313                        // NOTE: Force a merge into L0 itself
314                        // ...we seem to have *very* small flushes
315                        return if first_level.len() >= 32 {
316                            Choice::Merge(CompactionInput {
317                                dest_level: 0,
318                                segment_ids: first_level.list_ids(),
319                                // NOTE: Allow a bit of overshooting
320                                target_size: ((self.target_size as f32) * 1.1) as u64,
321                            })
322                        } else {
323                            Choice::DoNothing
324                        };
325                    }
326
327                    return Choice::Merge(CompactionInput {
328                        dest_level: 1,
329                        segment_ids: first_level.list_ids(),
330                        target_size: ((self.target_size as f32) * 1.1) as u64,
331                    });
332                }
333
334                if first_level_size < self.target_size.into() {
335                    // NOTE: We reached the threshold, but L0 is still very small
336                    // meaning we have very small segments, so do intra-L0 compaction
337                    return Choice::Merge(CompactionInput {
338                        dest_level: 0,
339                        segment_ids: first_level.list_ids(),
340                        target_size: self.target_size.into(),
341                    });
342                }
343
344                if !busy_levels.contains(&1) {
345                    let mut level = (**first_level).clone();
346                    level.sort_by_key_range();
347
348                    let Some(next_level) = &view.get(1) else {
349                        return Choice::DoNothing;
350                    };
351
352                    let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();
353
354                    // Get overlapping segments in next level
355                    let key_range = aggregate_key_range(&level);
356
357                    let next_level_overlapping_segment_ids: Vec<_> = next_level
358                        .overlapping_segments(&key_range)
359                        .map(Segment::id)
360                        .collect();
361
362                    segment_ids.extend(&next_level_overlapping_segment_ids);
363
364                    let choice = CompactionInput {
365                        segment_ids,
366                        dest_level: 1,
367                        target_size: u64::from(self.target_size),
368                    };
369
370                    if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
371                        return Choice::Move(choice);
372                    }
373                    return Choice::Merge(choice);
374                }
375            }
376        }
377
378        Choice::DoNothing
379    }
380}
381
382#[cfg(test)]
383mod tests {
384    use super::{Choice, Strategy};
385    use crate::{
386        block_cache::BlockCache,
387        compaction::{CompactionStrategy, Input as CompactionInput},
388        descriptor_table::FileDescriptorTable,
389        level_manifest::LevelManifest,
390        segment::{
391            block::offset::BlockOffset,
392            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
393            file_offsets::FileOffsets,
394            meta::{Metadata, SegmentId},
395            Segment, SegmentInner,
396        },
397        time::unix_timestamp,
398        Config, HashSet, KeyRange,
399    };
400    use std::{
401        path::Path,
402        sync::{atomic::AtomicBool, Arc},
403    };
404    use test_log::test;
405
406    fn string_key_range(a: &str, b: &str) -> KeyRange {
407        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
408    }
409
410    #[allow(
411        clippy::expect_used,
412        clippy::cast_possible_truncation,
413        clippy::cast_sign_loss
414    )]
415    fn fixture_segment(
416        id: SegmentId,
417        key_range: KeyRange,
418        size: u64,
419        tombstone_ratio: f32,
420    ) -> Segment {
421        let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
422
423        let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
424        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
425
426        SegmentInner {
427            tree_id: 0,
428            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
429            block_index,
430
431            offsets: FileOffsets {
432                bloom_ptr: BlockOffset(0),
433                range_filter_ptr: BlockOffset(0),
434                index_block_ptr: BlockOffset(0),
435                metadata_ptr: BlockOffset(0),
436                range_tombstones_ptr: BlockOffset(0),
437                tli_ptr: BlockOffset(0),
438                pfx_ptr: BlockOffset(0),
439            },
440
441            metadata: Metadata {
442                data_block_count: 0,
443                index_block_count: 0,
444                data_block_size: 4_096,
445                index_block_size: 4_096,
446                created_at: unix_timestamp().as_nanos(),
447                id,
448                file_size: size,
449                compression: crate::segment::meta::CompressionType::None,
450                table_type: crate::segment::meta::TableType::Block,
451                item_count: 1_000_000,
452                key_count: 0,
453                key_range,
454                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
455                range_tombstone_count: 0,
456                uncompressed_size: 0,
457                seqnos: (0, 0),
458            },
459            block_cache,
460
461            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
462
463            path: "a".into(),
464            is_deleted: AtomicBool::default(),
465        }
466        .into()
467    }
468
469    #[allow(clippy::expect_used)]
470    fn build_levels(
471        path: &Path,
472        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
473    ) -> crate::Result<LevelManifest> {
474        let mut levels = LevelManifest::create_new(
475            recipe.len().try_into().expect("oopsie"),
476            path.join("levels"),
477        )?;
478
479        for (idx, level) in recipe.into_iter().enumerate() {
480            for (id, min, max, size_mib) in level {
481                levels.insert_into_level(
482                    idx.try_into().expect("oopsie"),
483                    fixture_segment(
484                        id,
485                        string_key_range(min, max),
486                        size_mib * 1_024 * 1_024,
487                        0.0,
488                    ),
489                );
490            }
491        }
492
493        Ok(levels)
494    }
495
496    #[test]
497    fn leveled_empty_levels() -> crate::Result<()> {
498        let tempdir = tempfile::tempdir()?;
499        let compactor = Strategy::default();
500
501        #[rustfmt::skip]
502        let levels = build_levels(tempdir.path(), vec![
503            vec![],
504            vec![],
505            vec![],
506            vec![],
507        ])?;
508
509        assert_eq!(
510            compactor.choose(&levels, &Config::default()),
511            Choice::DoNothing
512        );
513
514        Ok(())
515    }
516
517    #[test]
518    fn leveled_default_l0() -> crate::Result<()> {
519        let tempdir = tempfile::tempdir()?;
520        let compactor = Strategy {
521            target_size: 64 * 1_024 * 1_024,
522            ..Default::default()
523        };
524
525        #[rustfmt::skip]
526        let mut levels = build_levels(tempdir.path(), vec![
527            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
528            vec![],
529            vec![],
530            vec![],
531        ])?;
532
533        assert_eq!(
534            compactor.choose(&levels, &Config::default()),
535            Choice::Merge(CompactionInput {
536                dest_level: 1,
537                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
538                target_size: 64 * 1_024 * 1_024
539            })
540        );
541
542        levels.hide_segments(std::iter::once(4));
543
544        assert_eq!(
545            compactor.choose(&levels, &Config::default()),
546            Choice::DoNothing
547        );
548
549        Ok(())
550    }
551
552    #[test]
553    #[allow(
554        clippy::cast_sign_loss,
555        clippy::cast_precision_loss,
556        clippy::cast_possible_truncation
557    )]
558    fn leveled_intra_l0() -> crate::Result<()> {
559        let tempdir = tempfile::tempdir()?;
560        let compactor = Strategy {
561            target_size: 64 * 1_024 * 1_024,
562            ..Default::default()
563        };
564
565        #[rustfmt::skip]
566        let mut levels = build_levels(tempdir.path(), vec![
567            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
568            vec![],
569            vec![],
570            vec![],
571        ])?;
572
573        assert_eq!(
574            compactor.choose(&levels, &Config::default()),
575            Choice::Merge(CompactionInput {
576                dest_level: 0,
577                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
578                target_size: u64::from(compactor.target_size),
579            })
580        );
581
582        levels.hide_segments(std::iter::once(4));
583
584        assert_eq!(
585            compactor.choose(&levels, &Config::default()),
586            Choice::DoNothing
587        );
588
589        Ok(())
590    }
591
592    #[test]
593    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
594        let tempdir = tempfile::tempdir()?;
595        let compactor = Strategy {
596            target_size: 64 * 1_024 * 1_024,
597            ..Default::default()
598        };
599
600        #[rustfmt::skip]
601        let levels = build_levels(tempdir.path(), vec![
602            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
603            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
604            vec![],
605            vec![],
606        ])?;
607
608        assert_eq!(
609            compactor.choose(&levels, &Config::default()),
610            Choice::Merge(CompactionInput {
611                dest_level: 1,
612                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
613                target_size: 64 * 1_024 * 1_024
614            })
615        );
616
617        Ok(())
618    }
619
620    #[test]
621    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
622        let tempdir = tempfile::tempdir()?;
623        let compactor = Strategy {
624            target_size: 64 * 1_024 * 1_024,
625            ..Default::default()
626        };
627
628        #[rustfmt::skip]
629        let mut levels = build_levels(tempdir.path(), vec![
630            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
631            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
632            vec![],
633            vec![],
634        ])?;
635
636        assert_eq!(
637            compactor.choose(&levels, &Config::default()),
638            Choice::Merge(CompactionInput {
639                dest_level: 1,
640                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
641                target_size: 64 * 1_024 * 1_024
642            })
643        );
644
645        levels.hide_segments(std::iter::once(5));
646        assert_eq!(
647            compactor.choose(&levels, &Config::default()),
648            Choice::DoNothing
649        );
650
651        Ok(())
652    }
653
654    #[test]
655    fn levelled_from_tiered() -> crate::Result<()> {
656        let tempdir = tempfile::tempdir()?;
657        let compactor = Strategy {
658            target_size: 64 * 1_024 * 1_024,
659            ..Default::default()
660        };
661        let config = Config::default();
662
663        #[rustfmt::skip]
664        let levels = build_levels(tempdir.path(), vec![
665            vec![],
666            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
667            vec![(4, "a", "g", 64)],
668            vec![],
669        ])?;
670
671        assert_eq!(
672            compactor.choose(&levels, &config),
673            Choice::Merge(CompactionInput {
674                dest_level: 2,
675                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
676                target_size: 64 * 1_024 * 1_024
677            })
678        );
679
680        Ok(())
681    }
682}