lsm_tree/compaction/
leveled.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    config::Config,
8    key_range::KeyRange,
9    level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
10    segment::Segment,
11    windows::{GrowingWindowsExt, ShrinkingWindowsExt},
12    HashSet, SegmentId,
13};
14
15/// Aggregates the key range of a list of segments.
16fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
17    KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
18}
19
20/// Tries to find the most optimal compaction set from
21/// one level into the other.
22fn pick_minimal_compaction(
23    curr_level: &Level,
24    next_level: &Level,
25    hidden_set: &HiddenSet,
26) -> Option<(HashSet<SegmentId>, bool)> {
27    // assert!(curr_level.is_disjoint, "Lx is not disjoint");
28    // assert!(next_level.is_disjoint, "Lx+1 is not disjoint");
29
30    struct Choice {
31        write_amp: f32,
32        segment_ids: HashSet<SegmentId>,
33        can_trivial_move: bool,
34    }
35
36    let mut choices = vec![];
37
38    let mut add_choice = |choice: Choice| {
39        let mut valid_choice = true;
40
41        // IMPORTANT: Compaction is blocked because of other
42        // on-going compaction
43        valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));
44
45        // NOTE: Keep compactions with 25 or less segments
46        // to make compactions not too large
47        valid_choice &= choice.can_trivial_move || choice.segment_ids.len() <= 25;
48
49        if valid_choice {
50            choices.push(choice);
51        }
52    };
53
54    for window in next_level.growing_windows() {
55        if hidden_set.is_blocked(window.iter().map(Segment::id)) {
56            // IMPORTANT: Compaction is blocked because of other
57            // on-going compaction
58            continue;
59        }
60
61        let key_range = aggregate_key_range(window);
62
63        // Pull in all segments in current level into compaction
64        let curr_level_pull_in: Vec<_> = if curr_level.is_disjoint {
65            // IMPORTANT: Avoid "infectious spread" of key ranges
66            // Imagine these levels:
67            //
68            //      A     B     C     D     E     F
69            // L1 | ----- ----- ----- ----- ----- -----
70            // L2 |    -----  -----  ----- ----- -----
71            //         1      2      3     4     5
72            //
73            // If we took 1, we would also have to include B,
74            // but then we would also have to include 2,
75            // but then we would also have to include C,
76            // but then we would also have to include 3,
77            // ...
78            //
79            // Instead, we consider a window like 1 - 3
80            // and then take B & C, because they are *contained* in that range
81            // Not including A or D is fine, because we are not shadowing data unexpectedly
82            curr_level.contained_segments(&key_range).collect()
83        } else {
84            // If the level is not disjoint, we just merge everything that overlaps
85            // to try and "repair" the level
86            curr_level.overlapping_segments(&key_range).collect()
87        };
88
89        if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
90            // IMPORTANT: Compaction is blocked because of other
91            // on-going compaction
92            continue;
93        }
94
95        let curr_level_size = curr_level_pull_in
96            .iter()
97            .map(|x| x.metadata.file_size)
98            .sum::<u64>();
99
100        // NOTE: Only consider compactions where we actually reach the amount
101        // of bytes we need to merge
102        if curr_level_size >= 1 {
103            let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
104
105            let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
106            segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));
107
108            let write_amp = (next_level_size as f32) / (curr_level_size as f32);
109
110            add_choice(Choice {
111                write_amp,
112                segment_ids,
113                can_trivial_move: false,
114            });
115        }
116    }
117
118    // NOTE: Find largest trivial move (if it exists)
119    for window in curr_level.shrinking_windows() {
120        let key_range = aggregate_key_range(window);
121
122        if next_level.overlapping_segments(&key_range).next().is_none() {
123            add_choice(Choice {
124                write_amp: 0.0,
125                segment_ids: window.iter().map(Segment::id).collect(),
126                can_trivial_move: true,
127            });
128            break;
129        }
130    }
131
132    let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
133        a.write_amp
134            .partial_cmp(&b.write_amp)
135            .unwrap_or(std::cmp::Ordering::Equal)
136    });
137
138    minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
139}
140
141/// Levelled compaction strategy (LCS)
142///
143/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
144///
145/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^n` segments.
146///
147/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
148///
149/// LCS is the recommended compaction strategy to use.
150///
151/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
152#[derive(Clone)]
153pub struct Strategy {
154    /// When the number of segments in L0 reaches this threshold,
155    /// they are merged into L1.
156    ///
157    /// Default = 4
158    ///
159    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
160    pub l0_threshold: u8,
161
162    /// The target segment size as disk (possibly compressed).
163    ///
164    /// Default = 64 MiB
165    ///
166    /// Same as `target_file_size_base` in `RocksDB`.
167    pub target_size: u32,
168
169    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
170    ///
171    /// This is the exponential growth of the from one.
172    /// level to the next
173    ///
174    /// A level target size is: max_memtable_size * level_ratio.pow(#level + 1).
175    #[allow(clippy::doc_markdown)]
176    pub level_ratio: u8,
177}
178
179impl Default for Strategy {
180    fn default() -> Self {
181        Self {
182            l0_threshold: 4,
183            target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
184            level_ratio: 10,
185        }
186    }
187}
188
189impl Strategy {
190    /// Calculates the level target size.
191    ///
192    /// L1 = `level_base_size`
193    ///
194    /// L2 = `level_base_size * ratio`
195    ///
196    /// L3 = `level_base_size * ratio * ratio`
197    /// ...
198    fn level_target_size(&self, level_idx: u8) -> u64 {
199        assert!(level_idx >= 1, "level_target_size does not apply to L0");
200
201        let power = (self.level_ratio as usize).pow(u32::from(level_idx) - 1);
202
203        (power * (self.level_base_size() as usize)) as u64
204    }
205
206    fn level_base_size(&self) -> u64 {
207        self.target_size as u64 * self.l0_threshold as u64
208    }
209}
210
211impl CompactionStrategy for Strategy {
212    fn get_name(&self) -> &'static str {
213        "LeveledStrategy"
214    }
215
216    #[allow(clippy::too_many_lines)]
217    fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
218        let view = &levels.levels;
219
220        // L1+ compactions
221        for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
222        {
223            // NOTE: Level count is 255 max
224            #[allow(clippy::cast_possible_truncation)]
225            let curr_level_index = curr_level_index as u8;
226
227            let next_level_index = curr_level_index + 1;
228
229            if level.is_empty() {
230                continue;
231            }
232
233            let level_size: u64 = level
234                .segments
235                .iter()
236                // NOTE: Take bytes that are already being compacted into account,
237                // otherwise we may be overcompensating
238                .filter(|x| !levels.hidden_set().is_hidden(x.id()))
239                .map(|x| x.metadata.file_size)
240                .sum();
241
242            let desired_bytes = self.level_target_size(curr_level_index);
243
244            let overshoot = level_size.saturating_sub(desired_bytes);
245
246            if overshoot > 0 {
247                let Some(next_level) = &view.get(next_level_index as usize) else {
248                    break;
249                };
250
251                let Some((segment_ids, can_trivial_move)) =
252                    pick_minimal_compaction(level, next_level, levels.hidden_set())
253                else {
254                    break;
255                };
256
257                // eprintln!(
258                //     "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}",
259                //     segment_ids.len(),
260                //     next_level_index - 1,
261                // );
262
263                let choice = CompactionInput {
264                    segment_ids,
265                    dest_level: next_level_index,
266                    target_size: u64::from(self.target_size),
267                };
268
269                // TODO: eventually, this should happen lazily
270                // if a segment file lives for very long, it should get rewritten
271                // Rocks, by default, rewrites files that are 1 month or older
272                //
273                // TODO: 3.0.0 configuration?
274                // NOTE: We purposefully not trivially move segments
275                // if we go from L1 to L2
276                // https://github.com/fjall-rs/lsm-tree/issues/63
277                let goes_into_cold_storage = next_level_index == 2;
278
279                if goes_into_cold_storage {
280                    return Choice::Merge(choice);
281                }
282
283                if can_trivial_move && level.is_disjoint {
284                    return Choice::Move(choice);
285                }
286                return Choice::Merge(choice);
287            }
288        }
289
290        // L0->L1 compactions
291        {
292            let busy_levels = levels.busy_levels();
293
294            let Some(first_level) = view.first() else {
295                return Choice::DoNothing;
296            };
297
298            if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
299                let first_level_size = first_level.size();
300
301                // NOTE: Special handling for disjoint workloads
302                if levels.is_disjoint() {
303                    if first_level_size < self.target_size.into() {
304                        // TODO: also do this in non-disjoint workloads
305                        // -> intra-L0 compaction
306
307                        // NOTE: Force a merge into L0 itself
308                        // ...we seem to have *very* small flushes
309                        return if first_level.len() >= 32 {
310                            Choice::Merge(CompactionInput {
311                                dest_level: 0,
312                                segment_ids: first_level.list_ids(),
313                                // NOTE: Allow a bit of overshooting
314                                target_size: ((self.target_size as f32) * 1.1) as u64,
315                            })
316                        } else {
317                            Choice::DoNothing
318                        };
319                    }
320
321                    return Choice::Merge(CompactionInput {
322                        dest_level: 1,
323                        segment_ids: first_level.list_ids(),
324                        target_size: ((self.target_size as f32) * 1.1) as u64,
325                    });
326                }
327
328                if first_level_size < self.target_size.into() {
329                    // NOTE: We reached the threshold, but L0 is still very small
330                    // meaning we have very small segments, so do intra-L0 compaction
331                    return Choice::Merge(CompactionInput {
332                        dest_level: 0,
333                        segment_ids: first_level.list_ids(),
334                        target_size: self.target_size.into(),
335                    });
336                }
337
338                if !busy_levels.contains(&1) {
339                    let mut level = (**first_level).clone();
340                    level.sort_by_key_range();
341
342                    let Some(next_level) = &view.get(1) else {
343                        return Choice::DoNothing;
344                    };
345
346                    let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();
347
348                    // Get overlapping segments in next level
349                    let key_range = aggregate_key_range(&level);
350
351                    let next_level_overlapping_segment_ids: Vec<_> = next_level
352                        .overlapping_segments(&key_range)
353                        .map(Segment::id)
354                        .collect();
355
356                    segment_ids.extend(&next_level_overlapping_segment_ids);
357
358                    let choice = CompactionInput {
359                        segment_ids,
360                        dest_level: 1,
361                        target_size: u64::from(self.target_size),
362                    };
363
364                    if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
365                        return Choice::Move(choice);
366                    }
367                    return Choice::Merge(choice);
368                }
369            }
370        }
371
372        Choice::DoNothing
373    }
374}
375
376#[cfg(test)]
377mod tests {
378    use super::{Choice, Strategy};
379    use crate::{
380        block_cache::BlockCache,
381        compaction::{CompactionStrategy, Input as CompactionInput},
382        descriptor_table::FileDescriptorTable,
383        key_range::KeyRange,
384        level_manifest::LevelManifest,
385        segment::{
386            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
387            file_offsets::FileOffsets,
388            meta::{Metadata, SegmentId},
389            value_block::BlockOffset,
390            Segment, SegmentInner,
391        },
392        time::unix_timestamp,
393        Config, HashSet,
394    };
395    use std::{path::Path, sync::Arc};
396    use test_log::test;
397
398    fn string_key_range(a: &str, b: &str) -> KeyRange {
399        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
400    }
401
402    #[allow(
403        clippy::expect_used,
404        clippy::cast_possible_truncation,
405        clippy::cast_sign_loss
406    )]
407    fn fixture_segment(
408        id: SegmentId,
409        key_range: KeyRange,
410        size: u64,
411        tombstone_ratio: f32,
412    ) -> Segment {
413        let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
414
415        let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
416        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
417
418        SegmentInner {
419            tree_id: 0,
420            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
421            block_index,
422
423            offsets: FileOffsets {
424                bloom_ptr: BlockOffset(0),
425                range_filter_ptr: BlockOffset(0),
426                index_block_ptr: BlockOffset(0),
427                metadata_ptr: BlockOffset(0),
428                range_tombstones_ptr: BlockOffset(0),
429                tli_ptr: BlockOffset(0),
430                pfx_ptr: BlockOffset(0),
431            },
432
433            metadata: Metadata {
434                data_block_count: 0,
435                index_block_count: 0,
436                data_block_size: 4_096,
437                index_block_size: 4_096,
438                created_at: unix_timestamp().as_nanos(),
439                id,
440                file_size: size,
441                compression: crate::segment::meta::CompressionType::None,
442                table_type: crate::segment::meta::TableType::Block,
443                item_count: 1_000_000,
444                key_count: 0,
445                key_range,
446                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
447                range_tombstone_count: 0,
448                uncompressed_size: 0,
449                seqnos: (0, 0),
450            },
451            block_cache,
452
453            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
454        }
455        .into()
456    }
457
458    #[allow(clippy::expect_used)]
459    fn build_levels(
460        path: &Path,
461        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
462    ) -> crate::Result<LevelManifest> {
463        let mut levels = LevelManifest::create_new(
464            recipe.len().try_into().expect("oopsie"),
465            path.join("levels"),
466        )?;
467
468        for (idx, level) in recipe.into_iter().enumerate() {
469            for (id, min, max, size_mib) in level {
470                levels.insert_into_level(
471                    idx.try_into().expect("oopsie"),
472                    fixture_segment(
473                        id,
474                        string_key_range(min, max),
475                        size_mib * 1_024 * 1_024,
476                        0.0,
477                    ),
478                );
479            }
480        }
481
482        Ok(levels)
483    }
484
485    #[test]
486    fn leveled_empty_levels() -> crate::Result<()> {
487        let tempdir = tempfile::tempdir()?;
488        let compactor = Strategy::default();
489
490        #[rustfmt::skip]
491        let levels = build_levels(tempdir.path(), vec![
492            vec![],
493            vec![],
494            vec![],
495            vec![],
496        ])?;
497
498        assert_eq!(
499            compactor.choose(&levels, &Config::default()),
500            Choice::DoNothing
501        );
502
503        Ok(())
504    }
505
506    #[test]
507    fn leveled_default_l0() -> crate::Result<()> {
508        let tempdir = tempfile::tempdir()?;
509        let compactor = Strategy {
510            target_size: 64 * 1_024 * 1_024,
511            ..Default::default()
512        };
513
514        #[rustfmt::skip]
515        let mut levels = build_levels(tempdir.path(), vec![
516            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
517            vec![],
518            vec![],
519            vec![],
520        ])?;
521
522        assert_eq!(
523            compactor.choose(&levels, &Config::default()),
524            Choice::Merge(CompactionInput {
525                dest_level: 1,
526                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
527                target_size: 64 * 1_024 * 1_024
528            })
529        );
530
531        levels.hide_segments(std::iter::once(4));
532
533        assert_eq!(
534            compactor.choose(&levels, &Config::default()),
535            Choice::DoNothing
536        );
537
538        Ok(())
539    }
540
541    #[test]
542    #[allow(
543        clippy::cast_sign_loss,
544        clippy::cast_precision_loss,
545        clippy::cast_possible_truncation
546    )]
547    fn leveled_intra_l0() -> crate::Result<()> {
548        let tempdir = tempfile::tempdir()?;
549        let compactor = Strategy {
550            target_size: 64 * 1_024 * 1_024,
551            ..Default::default()
552        };
553
554        #[rustfmt::skip]
555        let mut levels = build_levels(tempdir.path(), vec![
556            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
557            vec![],
558            vec![],
559            vec![],
560        ])?;
561
562        assert_eq!(
563            compactor.choose(&levels, &Config::default()),
564            Choice::Merge(CompactionInput {
565                dest_level: 0,
566                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
567                target_size: u64::from(compactor.target_size),
568            })
569        );
570
571        levels.hide_segments(std::iter::once(4));
572
573        assert_eq!(
574            compactor.choose(&levels, &Config::default()),
575            Choice::DoNothing
576        );
577
578        Ok(())
579    }
580
581    #[test]
582    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
583        let tempdir = tempfile::tempdir()?;
584        let compactor = Strategy {
585            target_size: 64 * 1_024 * 1_024,
586            ..Default::default()
587        };
588
589        #[rustfmt::skip]
590        let levels = build_levels(tempdir.path(), vec![
591            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
592            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
593            vec![],
594            vec![],
595        ])?;
596
597        assert_eq!(
598            compactor.choose(&levels, &Config::default()),
599            Choice::Merge(CompactionInput {
600                dest_level: 1,
601                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
602                target_size: 64 * 1_024 * 1_024
603            })
604        );
605
606        Ok(())
607    }
608
609    #[test]
610    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
611        let tempdir = tempfile::tempdir()?;
612        let compactor = Strategy {
613            target_size: 64 * 1_024 * 1_024,
614            ..Default::default()
615        };
616
617        #[rustfmt::skip]
618        let mut levels = build_levels(tempdir.path(), vec![
619            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
620            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
621            vec![],
622            vec![],
623        ])?;
624
625        assert_eq!(
626            compactor.choose(&levels, &Config::default()),
627            Choice::Merge(CompactionInput {
628                dest_level: 1,
629                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
630                target_size: 64 * 1_024 * 1_024
631            })
632        );
633
634        levels.hide_segments(std::iter::once(5));
635        assert_eq!(
636            compactor.choose(&levels, &Config::default()),
637            Choice::DoNothing
638        );
639
640        Ok(())
641    }
642
643    #[test]
644    fn levelled_from_tiered() -> crate::Result<()> {
645        let tempdir = tempfile::tempdir()?;
646        let compactor = Strategy {
647            target_size: 64 * 1_024 * 1_024,
648            ..Default::default()
649        };
650        let config = Config::default();
651
652        #[rustfmt::skip]
653        let levels = build_levels(tempdir.path(), vec![
654            vec![],
655            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
656            vec![(4, "a", "g", 64)],
657            vec![],
658        ])?;
659
660        assert_eq!(
661            compactor.choose(&levels, &config),
662            Choice::Merge(CompactionInput {
663                dest_level: 2,
664                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
665                target_size: 64 * 1_024 * 1_024
666            })
667        );
668
669        Ok(())
670    }
671}