lsm_tree/compaction/
tiered.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{level_manifest::LevelManifest, Config, HashSet, Segment};
7
8fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, base_size: u32) -> usize {
9    (ratio as usize).pow(u32::from(level_idx + 1)) * (base_size as usize)
10}
11
12/// Size-tiered compaction strategy (STCS)
13///
14/// If a level reaches a threshold, it is merged into a larger segment to the next level.
15///
16/// STCS suffers from high read and temporary doubled space amplification, but has good write amplification.
17#[derive(Clone)]
18pub struct Strategy {
19    /// Base size
20    pub base_size: u32,
21
22    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate).
23    ///
24    /// This is the exponential growth of the from one
25    /// level to the next
26    ///
27    /// A level target size is: base_size * level_ratio.pow(#level + 1)
28    #[allow(clippy::doc_markdown)]
29    pub level_ratio: u8,
30}
31
32impl Strategy {
33    /// Creates a new STCS strategy with custom base size
34    #[must_use]
35    pub fn new(base_size: u32, level_ratio: u8) -> Self {
36        Self {
37            base_size,
38            level_ratio,
39        }
40    }
41}
42
43impl Default for Strategy {
44    fn default() -> Self {
45        Self {
46            base_size: 64 * 1_024 * 1_024,
47            level_ratio: 4,
48        }
49    }
50}
51
52impl CompactionStrategy for Strategy {
53    fn get_name(&self) -> &'static str {
54        "TieredStrategy"
55    }
56
57    fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
58        let resolved_view = levels.resolved_view();
59
60        for (curr_level_index, level) in resolved_view
61            .iter()
62            .enumerate()
63            .take(resolved_view.len() - 1)
64            .rev()
65        {
66            // NOTE: Level count is 255 max
67            #[allow(clippy::cast_possible_truncation)]
68            let curr_level_index = curr_level_index as u8;
69
70            let next_level_index = curr_level_index + 1;
71
72            if level.is_empty() {
73                continue;
74            }
75
76            let level_size: u64 = level
77                .segments
78                .iter()
79                // NOTE: Take bytes that are already being compacted into account,
80                // otherwise we may be overcompensating
81                .filter(|x| !levels.hidden_set().is_hidden(x.id()))
82                .map(|x| x.metadata.file_size)
83                .sum();
84
85            let desired_bytes =
86                desired_level_size_in_bytes(curr_level_index, self.level_ratio, self.base_size)
87                    as u64;
88
89            if level_size >= desired_bytes {
90                // NOTE: Take desired_bytes because we are in tiered mode
91                // We want to take N segments, not just the overshoot (like in leveled)
92                let mut overshoot = desired_bytes;
93
94                let mut segments_to_compact = vec![];
95
96                for segment in level.iter().rev().take(self.level_ratio.into()).cloned() {
97                    if overshoot == 0 {
98                        break;
99                    }
100
101                    overshoot = overshoot.saturating_sub(segment.metadata.file_size);
102                    segments_to_compact.push(segment);
103                }
104
105                let mut segment_ids: HashSet<_> =
106                    segments_to_compact.iter().map(Segment::id).collect();
107
108                // NOTE: If dest level is the last level, just overwrite it
109                //
110                // If we didn't overwrite Lmax, it would end up amassing more and more
111                // segments
112                // Also, because it's the last level, the frequency of overwiting it is
113                // amortized because of the LSM-tree's level structure
114                if next_level_index == 6 {
115                    // Wait for L6 to be non-busy
116                    if levels.busy_levels().contains(&next_level_index) {
117                        continue;
118                    }
119
120                    segment_ids.extend(
121                        levels
122                            .levels
123                            .last()
124                            .expect("last level should always exist")
125                            .list_ids(),
126                    );
127                }
128
129                return Choice::Merge(CompactionInput {
130                    segment_ids,
131                    dest_level: next_level_index,
132                    target_size: u64::MAX,
133                });
134            }
135        }
136
137        // TODO: after major compaction, SizeTiered may behave weirdly
138        // if major compaction is not outputting into Lmax
139
140        // TODO: if level.size >= base_size and there are enough
141        // segments with size < base_size, compact them together
142        // no matter the amount of segments in L0 -> should reduce
143        // write stall chance
144        //
145        // TODO: however: force compaction if L0 becomes way too large
146
147        // NOTE: Reduce L0 segments if needed
148        // this is probably an edge case if the `base_size` does not line up with
149        // the `max_memtable_size` AT ALL
150        super::maintenance::Strategy.choose(levels, config)
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::Strategy;
157    use crate::{
158        bloom::BloomFilter,
159        cache::Cache,
160        compaction::{Choice, CompactionStrategy, Input as CompactionInput},
161        config::Config,
162        descriptor_table::FileDescriptorTable,
163        file::LEVELS_MANIFEST_FILE,
164        level_manifest::LevelManifest,
165        segment::{
166            block::offset::BlockOffset,
167            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
168            file_offsets::FileOffsets,
169            meta::{Metadata, SegmentId},
170            Segment, SegmentInner,
171        },
172        HashSet, KeyRange, SeqNo,
173    };
174    use std::sync::{atomic::AtomicBool, Arc};
175    use test_log::test;
176
177    #[allow(clippy::expect_used)]
178    fn fixture_segment(id: SegmentId, size_mib: u64, max_seqno: SeqNo) -> Segment {
179        let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
180
181        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
182        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
183
184        SegmentInner {
185            tree_id: 0,
186            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
187            block_index,
188
189            offsets: FileOffsets {
190                bloom_ptr: BlockOffset(0),
191                range_filter_ptr: BlockOffset(0),
192                index_block_ptr: BlockOffset(0),
193                metadata_ptr: BlockOffset(0),
194                range_tombstones_ptr: BlockOffset(0),
195                tli_ptr: BlockOffset(0),
196                pfx_ptr: BlockOffset(0),
197            },
198
199            metadata: Metadata {
200                data_block_count: 0,
201                index_block_count: 0,
202                data_block_size: 4_096,
203                index_block_size: 4_096,
204                created_at: 0,
205                id,
206                file_size: size_mib * 1_024 * 1_024,
207                compression: crate::segment::meta::CompressionType::None,
208                table_type: crate::segment::meta::TableType::Block,
209                item_count: 0,
210                key_count: 0,
211                key_range: KeyRange::new((vec![].into(), vec![].into())),
212                tombstone_count: 0,
213                range_tombstone_count: 0,
214                uncompressed_size: size_mib * 1_024 * 1_024,
215                seqnos: (0, max_seqno),
216            },
217            cache,
218
219            bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
220
221            path: "a".into(),
222            is_deleted: AtomicBool::default(),
223        }
224        .into()
225    }
226
227    #[test]
228    fn tiered_empty_levels() -> crate::Result<()> {
229        let tempdir = tempfile::tempdir()?;
230
231        let compactor = Strategy {
232            base_size: 8 * 1_024 * 1_024,
233            level_ratio: 8,
234        };
235
236        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
237
238        assert_eq!(
239            compactor.choose(&levels, &Config::default()),
240            Choice::DoNothing
241        );
242
243        Ok(())
244    }
245
246    #[test]
247    fn tiered_default_l0() -> crate::Result<()> {
248        let tempdir = tempfile::tempdir()?;
249
250        let compactor = Strategy {
251            base_size: 8 * 1_024 * 1_024,
252            level_ratio: 4,
253        };
254        let config = Config::default();
255
256        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
257
258        levels.add(fixture_segment(1, 8, 5));
259        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
260
261        levels.add(fixture_segment(2, 8, 6));
262        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
263
264        levels.add(fixture_segment(3, 8, 7));
265        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
266
267        levels.add(fixture_segment(4, 8, 8));
268
269        assert_eq!(
270            compactor.choose(&levels, &config),
271            Choice::Merge(CompactionInput {
272                dest_level: 1,
273                segment_ids: set![1, 2, 3, 4],
274                target_size: u64::MAX,
275            })
276        );
277
278        Ok(())
279    }
280
281    #[test]
282    fn tiered_ordering() -> crate::Result<()> {
283        let tempdir = tempfile::tempdir()?;
284
285        let compactor = Strategy {
286            base_size: 8 * 1_024 * 1_024,
287            level_ratio: 2,
288        };
289        let config = Config::default();
290
291        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
292
293        levels.add(fixture_segment(1, 8, 0));
294        levels.add(fixture_segment(2, 8, 1));
295        levels.add(fixture_segment(3, 8, 2));
296        levels.add(fixture_segment(4, 8, 3));
297
298        assert_eq!(
299            compactor.choose(&levels, &config),
300            Choice::Merge(CompactionInput {
301                dest_level: 1,
302                segment_ids: set![1, 2],
303                target_size: u64::MAX,
304            })
305        );
306
307        Ok(())
308    }
309
310    #[test]
311    fn tiered_more_than_min() -> crate::Result<()> {
312        let tempdir = tempfile::tempdir()?;
313
314        let compactor = Strategy {
315            base_size: 8 * 1_024 * 1_024,
316            level_ratio: 4,
317        };
318        let config = Config::default();
319
320        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
321        levels.add(fixture_segment(1, 8, 5));
322        levels.add(fixture_segment(2, 8, 6));
323        levels.add(fixture_segment(3, 8, 7));
324        levels.add(fixture_segment(4, 8, 8));
325
326        levels.insert_into_level(1, fixture_segment(5, 8 * 4, 9));
327        levels.insert_into_level(1, fixture_segment(6, 8 * 4, 10));
328        levels.insert_into_level(1, fixture_segment(7, 8 * 4, 11));
329        levels.insert_into_level(1, fixture_segment(8, 8 * 4, 12));
330
331        assert_eq!(
332            compactor.choose(&levels, &config),
333            Choice::Merge(CompactionInput {
334                dest_level: 2,
335                segment_ids: set![5, 6, 7, 8],
336                target_size: u64::MAX,
337            })
338        );
339
340        Ok(())
341    }
342
343    #[test]
344    fn tiered_many_segments() -> crate::Result<()> {
345        let tempdir = tempfile::tempdir()?;
346
347        let compactor = Strategy {
348            base_size: 8 * 1_024 * 1_024,
349            level_ratio: 2,
350        };
351        let config = Config::default();
352
353        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
354        levels.add(fixture_segment(1, 8, 5));
355        levels.add(fixture_segment(2, 8, 6));
356        levels.add(fixture_segment(3, 8, 7));
357        levels.add(fixture_segment(4, 8, 8));
358
359        assert_eq!(
360            compactor.choose(&levels, &config),
361            Choice::Merge(CompactionInput {
362                dest_level: 1,
363                segment_ids: set![1, 2],
364                target_size: u64::MAX,
365            })
366        );
367
368        Ok(())
369    }
370
371    #[test]
372    fn tiered_deeper_level() -> crate::Result<()> {
373        let tempdir = tempfile::tempdir()?;
374
375        let compactor = Strategy {
376            base_size: 8 * 1_024 * 1_024,
377            level_ratio: 2,
378        };
379        let config = Config::default();
380
381        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
382        levels.add(fixture_segment(1, 8, 5));
383
384        levels.insert_into_level(1, fixture_segment(2, 8 * 2, 6));
385        levels.insert_into_level(1, fixture_segment(3, 8 * 2, 7));
386
387        assert_eq!(
388            compactor.choose(&levels, &config),
389            Choice::Merge(CompactionInput {
390                dest_level: 2,
391                segment_ids: set![2, 3],
392                target_size: u64::MAX,
393            })
394        );
395
396        let tempdir = tempfile::tempdir()?;
397        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
398
399        levels.insert_into_level(2, fixture_segment(2, 8 * 4, 5));
400        levels.insert_into_level(2, fixture_segment(3, 8 * 4, 6));
401
402        assert_eq!(
403            compactor.choose(&levels, &config),
404            Choice::Merge(CompactionInput {
405                dest_level: 3,
406                segment_ids: set![2, 3],
407                target_size: u64::MAX,
408            })
409        );
410
411        Ok(())
412    }
413
414    #[test]
415    fn tiered_last_level() -> crate::Result<()> {
416        let tempdir = tempfile::tempdir()?;
417
418        let compactor = Strategy {
419            base_size: 8 * 1_024 * 1_024,
420            level_ratio: 2,
421        };
422        let config = Config::default();
423
424        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
425        levels.insert_into_level(3, fixture_segment(2, 8, 5));
426        levels.insert_into_level(3, fixture_segment(3, 8, 5));
427
428        assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
429
430        Ok(())
431    }
432}