lsm_tree/compaction/
leveled.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    config::Config,
8    level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
9    segment::Segment,
10    windows::{GrowingWindowsExt, ShrinkingWindowsExt},
11    HashSet, KeyRange, SegmentId,
12};
13
14// TODO: for a disjoint set of segments, we could just take the first and last segment and use their first and last key respectively
15/// Aggregates the key range of a list of segments.
16fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
17    KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
18}
19
20/// Tries to find the most optimal compaction set from
21/// one level into the other.
22fn pick_minimal_compaction(
23    curr_level: &Level,
24    next_level: &Level,
25    hidden_set: &HiddenSet,
26    segment_base_size: u64,
27) -> Option<(HashSet<SegmentId>, bool)> {
28    // assert!(curr_level.is_disjoint, "Lx is not disjoint");
29    // assert!(next_level.is_disjoint, "Lx+1 is not disjoint");
30
31    struct Choice {
32        write_amp: f32,
33        segment_ids: HashSet<SegmentId>,
34        can_trivial_move: bool,
35    }
36
37    let mut choices = vec![];
38
39    let mut add_choice = |choice: Choice| {
40        let mut valid_choice = true;
41
42        // IMPORTANT: Compaction is blocked because of other
43        // on-going compaction
44        valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));
45
46        // NOTE: Keep compactions with 25 or less segments
47        // to make compactions not too large
48        valid_choice &= choice.can_trivial_move || choice.segment_ids.len() <= 25;
49
50        if valid_choice {
51            choices.push(choice);
52        }
53    };
54
55    for window in next_level.growing_windows() {
56        let next_level_size = next_level.iter().map(|x| x.metadata.file_size).sum::<u64>();
57
58        // Cap at 50x segments per compaction for now
59        if next_level_size > (50 * segment_base_size) {
60            // NOTE: Break out of the loop, at this point, all compactions are too large anyway
61            // so we can escape early
62            break;
63        }
64
65        if hidden_set.is_blocked(window.iter().map(Segment::id)) {
66            // IMPORTANT: Compaction is blocked because of other
67            // on-going compaction
68            continue;
69        }
70
71        let key_range = aggregate_key_range(window);
72
73        // Pull in all segments in current level into compaction
74        let curr_level_pull_in: Vec<_> = if curr_level.is_disjoint {
75            // IMPORTANT: Avoid "infectious spread" of key ranges
76            // Imagine these levels:
77            //
78            //      A     B     C     D     E     F
79            // L1 | ----- ----- ----- ----- ----- -----
80            // L2 |    -----  -----  ----- ----- -----
81            //         1      2      3     4     5
82            //
83            // If we took 1, we would also have to include B,
84            // but then we would also have to include 2,
85            // but then we would also have to include C,
86            // but then we would also have to include 3,
87            // ...
88            //
89            // Instead, we consider a window like 1 - 3
90            // and then take B & C, because they are *contained* in that range
91            // Not including A or D is fine, because we are not shadowing data unexpectedly
92            curr_level.contained_segments(&key_range).collect()
93        } else {
94            // If the level is not disjoint, we just merge everything that overlaps
95            // to try and "repair" the level
96            curr_level.overlapping_segments(&key_range).collect()
97        };
98
99        if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
100            // IMPORTANT: Compaction is blocked because of other
101            // on-going compaction
102            continue;
103        }
104
105        let curr_level_size = curr_level_pull_in
106            .iter()
107            .map(|x| x.metadata.file_size)
108            .sum::<u64>();
109
110        // NOTE: Only consider compactions where we actually reach the amount
111        // of bytes we need to merge
112        if curr_level_size >= 1 {
113            let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
114
115            let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
116            segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));
117
118            let write_amp = (next_level_size as f32) / (curr_level_size as f32);
119
120            add_choice(Choice {
121                write_amp,
122                segment_ids,
123                can_trivial_move: false,
124            });
125        }
126    }
127
128    // NOTE: Find largest trivial move (if it exists)
129    for window in curr_level.shrinking_windows() {
130        let key_range = aggregate_key_range(window);
131
132        if next_level.overlapping_segments(&key_range).next().is_none() {
133            add_choice(Choice {
134                write_amp: 0.0,
135                segment_ids: window.iter().map(Segment::id).collect(),
136                can_trivial_move: true,
137            });
138            break;
139        }
140    }
141
142    let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
143        a.write_amp
144            .partial_cmp(&b.write_amp)
145            .unwrap_or(std::cmp::Ordering::Equal)
146    });
147
148    minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
149}
150
151/// Levelled compaction strategy (LCS)
152///
153/// When a level reaches some threshold size, parts of it are merged into overlapping segments in the next level.
154///
155/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^n` segments.
156///
157/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
158///
159/// LCS is the recommended compaction strategy to use.
160///
161/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
162#[derive(Clone)]
163pub struct Strategy {
164    /// When the number of segments in L0 reaches this threshold,
165    /// they are merged into L1.
166    ///
167    /// Default = 4
168    ///
169    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
170    pub l0_threshold: u8,
171
172    /// The target segment size as disk (possibly compressed).
173    ///
174    /// Default = 64 MiB
175    ///
176    /// Same as `target_file_size_base` in `RocksDB`.
177    pub target_size: u32,
178
179    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
180    ///
181    /// This is the exponential growth of the from one.
182    /// level to the next
183    ///
184    /// A level target size is: max_memtable_size * level_ratio.pow(#level + 1).
185    ///
186    /// Default = 10
187    #[allow(clippy::doc_markdown)]
188    pub level_ratio: u8,
189}
190
191impl Default for Strategy {
192    fn default() -> Self {
193        Self {
194            l0_threshold: 4,
195            target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
196            level_ratio: 10,
197        }
198    }
199}
200
201impl Strategy {
202    /// Calculates the level target size.
203    ///
204    /// L1 = `level_base_size`
205    ///
206    /// L2 = `level_base_size * ratio`
207    ///
208    /// L3 = `level_base_size * ratio * ratio`
209    /// ...
210    fn level_target_size(&self, level_idx: u8) -> u64 {
211        assert!(level_idx >= 1, "level_target_size does not apply to L0");
212
213        let power = (self.level_ratio as usize).pow(u32::from(level_idx) - 1);
214
215        (power * (self.level_base_size() as usize)) as u64
216    }
217
218    fn level_base_size(&self) -> u64 {
219        u64::from(self.target_size) * u64::from(self.l0_threshold)
220    }
221}
222
223impl CompactionStrategy for Strategy {
224    fn get_name(&self) -> &'static str {
225        "LeveledStrategy"
226    }
227
228    #[allow(clippy::too_many_lines)]
229    fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
230        let view = &levels.levels;
231
232        // TODO: look at L1+, if not disjoint
233        // TODO: try to repairing level by rewriting
234        // TODO: abort if any segment is hidden
235        // TODO: then make sure, non-disjoint levels cannot be used in subsequent code below
236        // TODO: add tests
237
238        // L1+ compactions
239        for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
240        {
241            // NOTE: Level count is 255 max
242            #[allow(clippy::cast_possible_truncation)]
243            let curr_level_index = curr_level_index as u8;
244
245            let next_level_index = curr_level_index + 1;
246
247            if level.is_empty() {
248                continue;
249            }
250
251            let level_size: u64 = level
252                .segments
253                .iter()
254                // NOTE: Take bytes that are already being compacted into account,
255                // otherwise we may be overcompensating
256                .filter(|x| !levels.hidden_set().is_hidden(x.id()))
257                .map(|x| x.metadata.file_size)
258                .sum();
259
260            let desired_bytes = self.level_target_size(curr_level_index);
261
262            let overshoot = level_size.saturating_sub(desired_bytes);
263
264            if overshoot > 0 {
265                let Some(next_level) = &view.get(next_level_index as usize) else {
266                    break;
267                };
268
269                let Some((segment_ids, can_trivial_move)) = pick_minimal_compaction(
270                    level,
271                    next_level,
272                    levels.hidden_set(),
273                    u64::from(self.target_size),
274                ) else {
275                    break;
276                };
277
278                // eprintln!(
279                //     "merge {} segments, L{}->L{next_level_index}: {segment_ids:?}",
280                //     segment_ids.len(),
281                //     next_level_index - 1,
282                // );
283
284                let choice = CompactionInput {
285                    segment_ids,
286                    dest_level: next_level_index,
287                    target_size: u64::from(self.target_size),
288                };
289
290                // TODO: eventually, this should happen lazily
291                // if a segment file lives for very long, it should get rewritten
292                // Rocks, by default, rewrites files that are 1 month or older
293                //
294                // TODO: 3.0.0 configuration?
295                // NOTE: We purposefully not trivially move segments
296                // if we go from L1 to L2
297                // https://github.com/fjall-rs/lsm-tree/issues/63
298                let goes_into_cold_storage = next_level_index == 2;
299
300                if goes_into_cold_storage {
301                    return Choice::Merge(choice);
302                }
303
304                if can_trivial_move && level.is_disjoint {
305                    return Choice::Move(choice);
306                }
307                return Choice::Merge(choice);
308            }
309        }
310
311        // L0->L1 compactions
312        {
313            let busy_levels = levels.busy_levels();
314
315            let Some(first_level) = view.first() else {
316                return Choice::DoNothing;
317            };
318
319            if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
320                let first_level_size = first_level.size();
321
322                // NOTE: Special handling for disjoint workloads
323                if levels.is_disjoint() {
324                    if first_level_size < self.target_size.into() {
325                        // TODO: also do this in non-disjoint workloads
326                        // -> intra-L0 compaction
327
328                        // NOTE: Force a merge into L0 itself
329                        // ...we seem to have *very* small flushes
330                        return if first_level.len() >= 32 {
331                            Choice::Merge(CompactionInput {
332                                dest_level: 0,
333                                segment_ids: first_level.list_ids(),
334                                // NOTE: Allow a bit of overshooting
335                                target_size: ((self.target_size as f32) * 1.1) as u64,
336                            })
337                        } else {
338                            Choice::DoNothing
339                        };
340                    }
341
342                    return Choice::Merge(CompactionInput {
343                        dest_level: 1,
344                        segment_ids: first_level.list_ids(),
345                        target_size: ((self.target_size as f32) * 1.1) as u64,
346                    });
347                }
348
349                if first_level_size < self.target_size.into() {
350                    // NOTE: We reached the threshold, but L0 is still very small
351                    // meaning we have very small segments, so do intra-L0 compaction
352                    return Choice::Merge(CompactionInput {
353                        dest_level: 0,
354                        segment_ids: first_level.list_ids(),
355                        target_size: self.target_size.into(),
356                    });
357                }
358
359                if !busy_levels.contains(&1) {
360                    let mut level = (**first_level).clone();
361                    level.sort_by_key_range();
362
363                    let Some(next_level) = &view.get(1) else {
364                        return Choice::DoNothing;
365                    };
366
367                    // TODO: list_ids()
368                    let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();
369
370                    // Get overlapping segments in next level
371                    let key_range = aggregate_key_range(&level);
372
373                    let next_level_overlapping_segment_ids: Vec<_> = next_level
374                        .overlapping_segments(&key_range)
375                        .map(Segment::id)
376                        .collect();
377
378                    segment_ids.extend(&next_level_overlapping_segment_ids);
379
380                    let choice = CompactionInput {
381                        segment_ids,
382                        dest_level: 1,
383                        target_size: u64::from(self.target_size),
384                    };
385
386                    if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
387                        return Choice::Move(choice);
388                    }
389                    return Choice::Merge(choice);
390                }
391            }
392        }
393
394        Choice::DoNothing
395    }
396}
397
398#[cfg(test)]
399mod tests {
400    use super::{Choice, Strategy};
401    use crate::{
402        cache::Cache,
403        compaction::{CompactionStrategy, Input as CompactionInput},
404        descriptor_table::FileDescriptorTable,
405        level_manifest::LevelManifest,
406        segment::{
407            block::offset::BlockOffset,
408            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
409            file_offsets::FileOffsets,
410            meta::{Metadata, SegmentId},
411            Segment, SegmentInner,
412        },
413        time::unix_timestamp,
414        Config, HashSet, KeyRange,
415    };
416    use std::{
417        path::Path,
418        sync::{atomic::AtomicBool, Arc},
419    };
420    use test_log::test;
421
422    fn string_key_range(a: &str, b: &str) -> KeyRange {
423        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
424    }
425
426    #[allow(
427        clippy::expect_used,
428        clippy::cast_possible_truncation,
429        clippy::cast_sign_loss
430    )]
431    fn fixture_segment(
432        id: SegmentId,
433        key_range: KeyRange,
434        size: u64,
435        tombstone_ratio: f32,
436    ) -> Segment {
437        let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
438
439        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
440        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
441
442        SegmentInner {
443            tree_id: 0,
444            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
445            block_index,
446
447            offsets: FileOffsets {
448                bloom_ptr: BlockOffset(0),
449                range_filter_ptr: BlockOffset(0),
450                index_block_ptr: BlockOffset(0),
451                metadata_ptr: BlockOffset(0),
452                range_tombstones_ptr: BlockOffset(0),
453                tli_ptr: BlockOffset(0),
454                pfx_ptr: BlockOffset(0),
455            },
456
457            metadata: Metadata {
458                data_block_count: 0,
459                index_block_count: 0,
460                data_block_size: 4_096,
461                index_block_size: 4_096,
462                created_at: unix_timestamp().as_nanos(),
463                id,
464                file_size: size,
465                compression: crate::segment::meta::CompressionType::None,
466                table_type: crate::segment::meta::TableType::Block,
467                item_count: 1_000_000,
468                key_count: 0,
469                key_range,
470                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
471                range_tombstone_count: 0,
472                uncompressed_size: 0,
473                seqnos: (0, 0),
474            },
475            cache,
476
477            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
478
479            path: "a".into(),
480            is_deleted: AtomicBool::default(),
481        }
482        .into()
483    }
484
485    #[allow(clippy::expect_used)]
486    fn build_levels(
487        path: &Path,
488        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
489    ) -> crate::Result<LevelManifest> {
490        let mut levels = LevelManifest::create_new(
491            recipe.len().try_into().expect("oopsie"),
492            path.join("levels"),
493        )?;
494
495        for (idx, level) in recipe.into_iter().enumerate() {
496            for (id, min, max, size_mib) in level {
497                levels.insert_into_level(
498                    idx.try_into().expect("oopsie"),
499                    fixture_segment(
500                        id,
501                        string_key_range(min, max),
502                        size_mib * 1_024 * 1_024,
503                        0.0,
504                    ),
505                );
506            }
507        }
508
509        Ok(levels)
510    }
511
512    #[test]
513    fn leveled_empty_levels() -> crate::Result<()> {
514        let tempdir = tempfile::tempdir()?;
515        let compactor = Strategy::default();
516
517        #[rustfmt::skip]
518        let levels = build_levels(tempdir.path(), vec![
519            vec![],
520            vec![],
521            vec![],
522            vec![],
523        ])?;
524
525        assert_eq!(
526            compactor.choose(&levels, &Config::default()),
527            Choice::DoNothing
528        );
529
530        Ok(())
531    }
532
533    #[test]
534    fn leveled_default_l0() -> crate::Result<()> {
535        let tempdir = tempfile::tempdir()?;
536        let compactor = Strategy {
537            target_size: 64 * 1_024 * 1_024,
538            ..Default::default()
539        };
540
541        #[rustfmt::skip]
542        let mut levels = build_levels(tempdir.path(), vec![
543            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
544            vec![],
545            vec![],
546            vec![],
547        ])?;
548
549        assert_eq!(
550            compactor.choose(&levels, &Config::default()),
551            Choice::Merge(CompactionInput {
552                dest_level: 1,
553                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
554                target_size: 64 * 1_024 * 1_024
555            })
556        );
557
558        levels.hide_segments(std::iter::once(4));
559
560        assert_eq!(
561            compactor.choose(&levels, &Config::default()),
562            Choice::DoNothing
563        );
564
565        Ok(())
566    }
567
568    #[test]
569    #[allow(
570        clippy::cast_sign_loss,
571        clippy::cast_precision_loss,
572        clippy::cast_possible_truncation
573    )]
574    fn leveled_intra_l0() -> crate::Result<()> {
575        let tempdir = tempfile::tempdir()?;
576        let compactor = Strategy {
577            target_size: 64 * 1_024 * 1_024,
578            ..Default::default()
579        };
580
581        #[rustfmt::skip]
582        let mut levels = build_levels(tempdir.path(), vec![
583            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
584            vec![],
585            vec![],
586            vec![],
587        ])?;
588
589        assert_eq!(
590            compactor.choose(&levels, &Config::default()),
591            Choice::Merge(CompactionInput {
592                dest_level: 0,
593                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
594                target_size: u64::from(compactor.target_size),
595            })
596        );
597
598        levels.hide_segments(std::iter::once(4));
599
600        assert_eq!(
601            compactor.choose(&levels, &Config::default()),
602            Choice::DoNothing
603        );
604
605        Ok(())
606    }
607
608    #[test]
609    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
610        let tempdir = tempfile::tempdir()?;
611        let compactor = Strategy {
612            target_size: 64 * 1_024 * 1_024,
613            ..Default::default()
614        };
615
616        #[rustfmt::skip]
617        let levels = build_levels(tempdir.path(), vec![
618            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
619            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
620            vec![],
621            vec![],
622        ])?;
623
624        assert_eq!(
625            compactor.choose(&levels, &Config::default()),
626            Choice::Merge(CompactionInput {
627                dest_level: 1,
628                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
629                target_size: 64 * 1_024 * 1_024
630            })
631        );
632
633        Ok(())
634    }
635
636    #[test]
637    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
638        let tempdir = tempfile::tempdir()?;
639        let compactor = Strategy {
640            target_size: 64 * 1_024 * 1_024,
641            ..Default::default()
642        };
643
644        #[rustfmt::skip]
645        let mut levels = build_levels(tempdir.path(), vec![
646            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
647            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
648            vec![],
649            vec![],
650        ])?;
651
652        assert_eq!(
653            compactor.choose(&levels, &Config::default()),
654            Choice::Merge(CompactionInput {
655                dest_level: 1,
656                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
657                target_size: 64 * 1_024 * 1_024
658            })
659        );
660
661        levels.hide_segments(std::iter::once(5));
662        assert_eq!(
663            compactor.choose(&levels, &Config::default()),
664            Choice::DoNothing
665        );
666
667        Ok(())
668    }
669
670    #[test]
671    fn levelled_from_tiered() -> crate::Result<()> {
672        let tempdir = tempfile::tempdir()?;
673        let compactor = Strategy {
674            target_size: 64 * 1_024 * 1_024,
675            ..Default::default()
676        };
677        let config = Config::default();
678
679        #[rustfmt::skip]
680        let levels = build_levels(tempdir.path(), vec![
681            vec![],
682            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
683            vec![(4, "a", "g", 64)],
684            vec![],
685        ])?;
686
687        assert_eq!(
688            compactor.choose(&levels, &config),
689            Choice::Merge(CompactionInput {
690                dest_level: 2,
691                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
692                target_size: 64 * 1_024 * 1_024
693            })
694        );
695
696        Ok(())
697    }
698}