lsm_tree/compaction/
leveled.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    compaction::state::{hidden_set::HiddenSet, CompactionState},
8    config::Config,
9    segment::Segment,
10    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
11    version::{run::Ranged, Run, Version},
12    HashSet, KeyRange, SegmentId,
13};
14
15pub fn aggregate_run_key_range(segments: &[Segment]) -> KeyRange {
16    let lo = segments.first().expect("run should never be empty");
17    let hi = segments.last().expect("run should never be empty");
18    KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
19}
20
21/// Tries to find the most optimal compaction set from one level into the other.
22fn pick_minimal_compaction(
23    curr_run: &Run<Segment>,
24    next_run: Option<&Run<Segment>>,
25    hidden_set: &HiddenSet,
26    overshoot: u64,
27    segment_base_size: u64,
28) -> Option<(HashSet<SegmentId>, bool)> {
29    // NOTE: Find largest trivial move (if it exists)
30    if let Some(window) = curr_run.shrinking_windows().find(|window| {
31        if hidden_set.is_blocked(window.iter().map(Segment::id)) {
32            // IMPORTANT: Compaction is blocked because of other
33            // on-going compaction
34            return false;
35        }
36
37        let Some(next_run) = &next_run else {
38            // No run in next level, so we can trivially move
39            return true;
40        };
41
42        let key_range = aggregate_run_key_range(window);
43
44        next_run.get_overlapping(&key_range).is_empty()
45    }) {
46        let ids = window.iter().map(Segment::id).collect();
47        return Some((ids, true));
48    }
49
50    // NOTE: Look for merges
51    if let Some(next_run) = &next_run {
52        next_run
53            .growing_windows()
54            .take_while(|window| {
55                // Cap at 50x segments per compaction for now
56                //
57                // At this point, all compactions are too large anyway
58                // so we can escape early
59                let next_level_size = window.iter().map(Segment::file_size).sum::<u64>();
60                next_level_size <= (50 * segment_base_size)
61            })
62            .filter_map(|window| {
63                if hidden_set.is_blocked(window.iter().map(Segment::id)) {
64                    // IMPORTANT: Compaction is blocked because of other
65                    // on-going compaction
66                    return None;
67                }
68
69                let key_range = aggregate_run_key_range(window);
70
71                // Pull in all contained segments in current level into compaction
72                let curr_level_pull_in = curr_run.get_contained(&key_range);
73
74                let curr_level_size = curr_level_pull_in
75                    .iter()
76                    .map(Segment::file_size)
77                    .sum::<u64>();
78
79                // if curr_level_size < overshoot {
80                //     return None;
81                // }
82
83                if hidden_set.is_blocked(curr_level_pull_in.iter().map(Segment::id)) {
84                    // IMPORTANT: Compaction is blocked because of other
85                    // on-going compaction
86                    return None;
87                }
88
89                let next_level_size = window.iter().map(Segment::file_size).sum::<u64>();
90
91                //  let compaction_bytes = curr_level_size + next_level_size;
92
93                #[allow(clippy::cast_precision_loss)]
94                let write_amp = (next_level_size as f32) / (curr_level_size as f32);
95
96                Some((window, curr_level_pull_in, write_amp))
97            })
98            // Find the compaction with the smallest write amplification factor
99            .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
100            .map(|(window, curr_level_pull_in, _)| {
101                let mut ids: HashSet<_> = window.iter().map(Segment::id).collect();
102                ids.extend(curr_level_pull_in.iter().map(Segment::id));
103                (ids, false)
104            })
105    } else {
106        None
107    }
108}
109
110/// Levelled compaction strategy (LCS)
111///
112/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
113///
114/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` segments.
115///
116/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
117///
118/// LCS is the recommended compaction strategy to use.
119///
120/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
121#[derive(Clone)]
122pub struct Strategy {
123    /// When the number of segments in L0 reaches this threshold,
124    /// they are merged into L1.
125    ///
126    /// Default = 4
127    ///
128    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
129    pub l0_threshold: u8,
130
131    /// The target segment size as disk (possibly compressed).
132    ///
133    /// Default = 64 MiB
134    ///
135    /// Same as `target_file_size_base` in `RocksDB`.
136    pub target_size: u32,
137
138    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
139    ///
140    /// This is the exponential growth of the from one.
141    /// level to the next
142    ///
143    /// A level target size is: max_memtable_size * level_ratio.pow(#level + 1).
144    ///
145    /// Default = 10
146    #[allow(clippy::doc_markdown)]
147    pub level_ratio: u8,
148}
149
150impl Default for Strategy {
151    fn default() -> Self {
152        Self {
153            l0_threshold: 4,
154            target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
155            level_ratio: 10,
156        }
157    }
158}
159
160impl Strategy {
161    /// Calculates the level target size.
162    ///
163    /// L1 = `level_base_size`
164    ///
165    /// L2 = `level_base_size * ratio`
166    ///
167    /// L3 = `level_base_size * ratio * ratio`
168    ///
169    /// ...
170    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
171        assert!(
172            canonical_level_idx >= 1,
173            "level_target_size does not apply to L0",
174        );
175
176        let power = (self.level_ratio as usize).pow(u32::from(canonical_level_idx) - 1) as u64;
177
178        power * self.level_base_size()
179    }
180
181    fn level_base_size(&self) -> u64 {
182        u64::from(self.target_size) * u64::from(self.l0_threshold)
183    }
184}
185
186impl CompactionStrategy for Strategy {
187    fn get_name(&self) -> &'static str {
188        "LeveledCompaction"
189    }
190
191    #[allow(clippy::too_many_lines)]
192    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
193        assert!(version.level_count() == 7, "should have exactly 7 levels");
194
195        // Find the level that corresponds to L1
196        #[allow(clippy::map_unwrap_or)]
197        let mut canonical_l1_idx = version
198            .iter_levels()
199            .enumerate()
200            .skip(1)
201            .find(|(_, lvl)| !lvl.is_empty())
202            .map(|(idx, _)| idx)
203            .unwrap_or_else(|| version.level_count() - 1);
204
205        // Number of levels we have to shift to get from the actual level idx to the canonical
206        let mut level_shift = canonical_l1_idx - 1;
207
208        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
209            let need_new_l1 = version
210                .iter_levels()
211                .enumerate()
212                .skip(1)
213                .filter(|(_, lvl)| !lvl.is_empty())
214                .all(|(idx, level)| {
215                    let level_size = level
216                        .iter()
217                        .flat_map(|x| x.iter())
218                        // NOTE: Take bytes that are already being compacted into account,
219                        // otherwise we may be overcompensating
220                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
221                        .map(Segment::file_size)
222                        .sum::<u64>();
223
224                    let target_size = self.level_target_size((idx - level_shift) as u8);
225
226                    level_size > target_size
227                });
228
229            // Move up L1 one level if all current levels are at capacity
230            if need_new_l1 {
231                canonical_l1_idx -= 1;
232                level_shift -= 1;
233            }
234        }
235
236        // Scoring
237        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
238
239        {
240            // Score first level
241
242            // NOTE: We always have at least one level
243            #[allow(clippy::expect_used)]
244            let first_level = version.l0();
245
246            // TODO: use run_count instead? but be careful because of version free list GC thingy
247            if first_level.segment_count() >= usize::from(self.l0_threshold) {
248                let ratio = (first_level.segment_count() as f64) / f64::from(self.l0_threshold);
249                scores[0] = (ratio, 0);
250            }
251
252            // Score L1+
253            for (idx, level) in version.iter_levels().enumerate().skip(1) {
254                if level.is_empty() {
255                    continue;
256                }
257
258                let level_size = level
259                    .iter()
260                    .flat_map(|x| x.iter())
261                    // NOTE: Take bytes that are already being compacted into account,
262                    // otherwise we may be overcompensating
263                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
264                    .map(Segment::file_size)
265                    .sum::<u64>();
266
267                let target_size = self.level_target_size((idx - level_shift) as u8);
268
269                // NOTE: We check for level length above
270                #[allow(clippy::indexing_slicing)]
271                if level_size > target_size {
272                    scores[idx] = (
273                        level_size as f64 / target_size as f64,
274                        level_size - target_size,
275                    );
276
277                    // NOTE: Force a trivial move
278                    if version
279                        .level(idx + 1)
280                        .is_some_and(|next_level| next_level.is_empty())
281                    {
282                        scores[idx] = (99.99, 999);
283                    }
284                }
285            }
286
287            // NOTE: Never score Lmax
288            //
289            // NOTE: We check for level length above
290            #[allow(clippy::indexing_slicing)]
291            {
292                scores[6] = (0.0, 0);
293            }
294        }
295
296        // Choose compaction
297        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
298            .into_iter()
299            .enumerate()
300            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
301                score_a
302                    .partial_cmp(score_b)
303                    .unwrap_or(std::cmp::Ordering::Equal)
304            })
305            .expect("should have highest score somewhere");
306
307        if score < 1.0 {
308            return Choice::DoNothing;
309        }
310
311        // We choose L0->L1 compaction
312        if level_idx_with_highest_score == 0 {
313            let Some(first_level) = version.level(0) else {
314                return Choice::DoNothing;
315            };
316
317            if version.level_is_busy(0, state.hidden_set())
318                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
319            {
320                return Choice::DoNothing;
321            }
322
323            let Some(target_level) = &version.level(canonical_l1_idx) else {
324                return Choice::DoNothing;
325            };
326
327            let mut segment_ids: HashSet<u64> = first_level.list_ids();
328
329            let key_range = first_level.aggregate_key_range();
330
331            // Get overlapping segments in next level
332            let target_level_overlapping_segment_ids: Vec<_> = target_level
333                .iter()
334                .flat_map(|run| run.get_overlapping(&key_range))
335                .map(Segment::id)
336                .collect();
337
338            segment_ids.extend(&target_level_overlapping_segment_ids);
339
340            let choice = CompactionInput {
341                segment_ids,
342                dest_level: canonical_l1_idx as u8,
343                canonical_level: 1,
344                target_size: u64::from(self.target_size),
345            };
346
347            /* eprintln!(
348                "merge {} segments, L0->L1: {:?}",
349                choice.segment_ids.len(),
350                choice.segment_ids,
351            ); */
352
353            if target_level_overlapping_segment_ids.is_empty() && first_level.is_disjoint() {
354                return Choice::Move(choice);
355            }
356            return Choice::Merge(choice);
357        }
358
359        // We choose L1+ compaction instead
360
361        // NOTE: Level count is 255 max
362        #[allow(clippy::cast_possible_truncation)]
363        let curr_level_index = level_idx_with_highest_score as u8;
364
365        let next_level_index = curr_level_index + 1;
366
367        let Some(level) = version.level(level_idx_with_highest_score) else {
368            return Choice::DoNothing;
369        };
370
371        let Some(next_level) = version.level(next_level_index as usize) else {
372            return Choice::DoNothing;
373        };
374
375        debug_assert!(level.is_disjoint(), "level should be disjoint");
376        debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
377
378        let Some((segment_ids, can_trivial_move)) = pick_minimal_compaction(
379            level.first_run().expect("should have exactly one run"),
380            next_level.first_run().map(std::ops::Deref::deref),
381            state.hidden_set(),
382            overshoot_bytes,
383            u64::from(self.target_size),
384        ) else {
385            return Choice::DoNothing;
386        };
387
388        let choice = CompactionInput {
389            segment_ids,
390            dest_level: next_level_index,
391            canonical_level: next_level_index - (level_shift as u8),
392            target_size: u64::from(self.target_size),
393        };
394
395        /* eprintln!(
396            "{} {} segments, L{}->L{next_level_index}: {:?}",
397            if can_trivial_move { "move" } else { "merge" },
398            choice.segment_ids.len(),
399            next_level_index - 1,
400            choice.segment_ids,
401        ); */
402
403        if can_trivial_move && level.is_disjoint() {
404            return Choice::Move(choice);
405        }
406        Choice::Merge(choice)
407    }
408}
409
410/*
411#[cfg(test)]
412mod tests {
413    use super::{Choice, Strategy};
414    use crate::{
415        cache::Cache,
416        compaction::{CompactionStrategy, Input as CompactionInput},
417        descriptor_table::FileDescriptorTable,
418        level_manifest::LevelManifest,
419        segment::{
420            block::offset::BlockOffset,
421            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
422            file_offsets::FileOffsets,
423            meta::{Metadata, SegmentId},
424            SegmentInner,
425        },
426        super_segment::Segment,
427        time::unix_timestamp,
428        Config, HashSet, KeyRange,
429    };
430    use std::{
431        path::Path,
432        sync::{atomic::AtomicBool, Arc},
433    };
434    use test_log::test;
435
436    fn string_key_range(a: &str, b: &str) -> KeyRange {
437        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
438    }
439
440    #[allow(
441        clippy::expect_used,
442        clippy::cast_possible_truncation,
443        clippy::cast_sign_loss
444    )]
445    fn fixture_segment(
446        id: SegmentId,
447        key_range: KeyRange,
448        size: u64,
449        tombstone_ratio: f32,
450    ) -> Segment {
451        todo!()
452
453        /*   let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
454
455        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
456        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
457
458        SegmentInner {
459            tree_id: 0,
460            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
461            block_index,
462
463            offsets: FileOffsets {
464                bloom_ptr: BlockOffset(0),
465                range_filter_ptr: BlockOffset(0),
466                index_block_ptr: BlockOffset(0),
467                metadata_ptr: BlockOffset(0),
468                range_tombstones_ptr: BlockOffset(0),
469                tli_ptr: BlockOffset(0),
470                pfx_ptr: BlockOffset(0),
471            },
472
473            metadata: Metadata {
474                data_block_count: 0,
475                index_block_count: 0,
476                data_block_size: 4_096,
477                index_block_size: 4_096,
478                created_at: unix_timestamp().as_nanos(),
479                id,
480                file_size: size,
481                compression: crate::segment::meta::CompressionType::None,
482                table_type: crate::segment::meta::TableType::Block,
483                item_count: 1_000_000,
484                key_count: 0,
485                key_range,
486                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
487                range_tombstone_count: 0,
488                uncompressed_size: 0,
489                seqnos: (0, 0),
490            },
491            cache,
492
493            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
494
495            path: "a".into(),
496            is_deleted: AtomicBool::default(),
497        }
498        .into() */
499    }
500
501    #[allow(clippy::expect_used)]
502    fn build_levels(
503        path: &Path,
504        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
505    ) -> crate::Result<LevelManifest> {
506        let mut levels = LevelManifest::create_new(
507            recipe.len().try_into().expect("oopsie"),
508            path.join("levels"),
509        )?;
510
511        for (idx, level) in recipe.into_iter().enumerate() {
512            for (id, min, max, size_mib) in level {
513                levels.insert_into_level(
514                    idx.try_into().expect("oopsie"),
515                    fixture_segment(
516                        id,
517                        string_key_range(min, max),
518                        size_mib * 1_024 * 1_024,
519                        0.0,
520                    ),
521                );
522            }
523        }
524
525        Ok(levels)
526    }
527
528    #[test]
529    fn leveled_empty_levels() -> crate::Result<()> {
530        let tempdir = tempfile::tempdir()?;
531        let compactor = Strategy::default();
532
533        #[rustfmt::skip]
534        let levels = build_levels(tempdir.path(), vec![
535            vec![],
536            vec![],
537            vec![],
538            vec![],
539        ])?;
540
541        assert_eq!(
542            compactor.choose(&levels, &Config::default()),
543            Choice::DoNothing
544        );
545
546        Ok(())
547    }
548
549    #[test]
550    fn leveled_default_l0() -> crate::Result<()> {
551        let tempdir = tempfile::tempdir()?;
552        let compactor = Strategy {
553            target_size: 64 * 1_024 * 1_024,
554            ..Default::default()
555        };
556
557        #[rustfmt::skip]
558        let mut levels = build_levels(tempdir.path(), vec![
559            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
560            vec![],
561            vec![],
562            vec![],
563        ])?;
564
565        assert_eq!(
566            compactor.choose(&levels, &Config::default()),
567            Choice::Merge(CompactionInput {
568                dest_level: 1,
569                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
570                target_size: 64 * 1_024 * 1_024
571            })
572        );
573
574        levels.hide_segments(std::iter::once(4));
575
576        assert_eq!(
577            compactor.choose(&levels, &Config::default()),
578            Choice::DoNothing
579        );
580
581        Ok(())
582    }
583
584    #[test]
585    #[allow(
586        clippy::cast_sign_loss,
587        clippy::cast_precision_loss,
588        clippy::cast_possible_truncation
589    )]
590    fn leveled_intra_l0() -> crate::Result<()> {
591        let tempdir = tempfile::tempdir()?;
592        let compactor = Strategy {
593            target_size: 64 * 1_024 * 1_024,
594            ..Default::default()
595        };
596
597        #[rustfmt::skip]
598        let mut levels = build_levels(tempdir.path(), vec![
599            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
600            vec![],
601            vec![],
602            vec![],
603        ])?;
604
605        assert_eq!(
606            compactor.choose(&levels, &Config::default()),
607            Choice::Merge(CompactionInput {
608                dest_level: 0,
609                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
610                target_size: u64::from(compactor.target_size),
611            })
612        );
613
614        levels.hide_segments(std::iter::once(4));
615
616        assert_eq!(
617            compactor.choose(&levels, &Config::default()),
618            Choice::DoNothing
619        );
620
621        Ok(())
622    }
623
624    #[test]
625    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
626        let tempdir = tempfile::tempdir()?;
627        let compactor = Strategy {
628            target_size: 64 * 1_024 * 1_024,
629            ..Default::default()
630        };
631
632        #[rustfmt::skip]
633        let levels = build_levels(tempdir.path(), vec![
634            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
635            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
636            vec![],
637            vec![],
638        ])?;
639
640        assert_eq!(
641            compactor.choose(&levels, &Config::default()),
642            Choice::Merge(CompactionInput {
643                dest_level: 1,
644                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
645                target_size: 64 * 1_024 * 1_024
646            })
647        );
648
649        Ok(())
650    }
651
652    #[test]
653    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
654        let tempdir = tempfile::tempdir()?;
655        let compactor = Strategy {
656            target_size: 64 * 1_024 * 1_024,
657            ..Default::default()
658        };
659
660        #[rustfmt::skip]
661        let mut levels = build_levels(tempdir.path(), vec![
662            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
663            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
664            vec![],
665            vec![],
666        ])?;
667
668        assert_eq!(
669            compactor.choose(&levels, &Config::default()),
670            Choice::Merge(CompactionInput {
671                dest_level: 1,
672                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
673                target_size: 64 * 1_024 * 1_024
674            })
675        );
676
677        levels.hide_segments(std::iter::once(5));
678        assert_eq!(
679            compactor.choose(&levels, &Config::default()),
680            Choice::DoNothing
681        );
682
683        Ok(())
684    }
685
686    #[test]
687    fn levelled_from_tiered() -> crate::Result<()> {
688        let tempdir = tempfile::tempdir()?;
689        let compactor = Strategy {
690            target_size: 64 * 1_024 * 1_024,
691            ..Default::default()
692        };
693        let config = Config::default();
694
695        #[rustfmt::skip]
696        let levels = build_levels(tempdir.path(), vec![
697            vec![],
698            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
699            vec![(4, "a", "g", 64)],
700            vec![],
701        ])?;
702
703        assert_eq!(
704            compactor.choose(&levels, &config),
705            Choice::Merge(CompactionInput {
706                dest_level: 2,
707                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
708                target_size: 64 * 1_024 * 1_024
709            })
710        );
711
712        Ok(())
713    }
714}
715 */