lsm_tree/compaction/
leveled.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    compaction::state::{hidden_set::HiddenSet, CompactionState},
8    config::Config,
9    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
10    table::{util::aggregate_run_key_range, Table},
11    version::{Run, Version},
12    HashSet, TableId,
13};
14
15#[doc(hidden)]
16pub const NAME: &str = "LeveledCompaction";
17
18/// Tries to find the most optimal compaction set from one level into the other.
19fn pick_minimal_compaction(
20    curr_run: &Run<Table>,
21    next_run: Option<&Run<Table>>,
22    hidden_set: &HiddenSet,
23    _overshoot: u64,
24    table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26    // NOTE: Find largest trivial move (if it exists)
27    if let Some(window) = curr_run.shrinking_windows().find(|window| {
28        if hidden_set.is_blocked(window.iter().map(Table::id)) {
29            // IMPORTANT: Compaction is blocked because of other
30            // on-going compaction
31            return false;
32        }
33
34        let Some(next_run) = &next_run else {
35            // No run in next level, so we can trivially move
36            return true;
37        };
38
39        let key_range = aggregate_run_key_range(window);
40
41        next_run.get_overlapping(&key_range).is_empty()
42    }) {
43        let ids = window.iter().map(Table::id).collect();
44        return Some((ids, true));
45    }
46
47    // NOTE: Look for merges
48    if let Some(next_run) = &next_run {
49        next_run
50            .growing_windows()
51            .take_while(|window| {
52                // Cap at 50x tables per compaction for now
53                //
54                // At this point, all compactions are too large anyway
55                // so we can escape early
56                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57                next_level_size <= (50 * table_base_size)
58            })
59            .filter_map(|window| {
60                if hidden_set.is_blocked(window.iter().map(Table::id)) {
61                    // IMPORTANT: Compaction is blocked because of other
62                    // on-going compaction
63                    return None;
64                }
65
66                let key_range = aggregate_run_key_range(window);
67
68                // Pull in all contained tables in current level into compaction
69                let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71                let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73                // if curr_level_size < overshoot {
74                //     return None;
75                // }
76
77                if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
78                    // IMPORTANT: Compaction is blocked because of other
79                    // on-going compaction
80                    return None;
81                }
82
83                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
84
85                //  let compaction_bytes = curr_level_size + next_level_size;
86
87                #[expect(clippy::cast_precision_loss)]
88                let write_amp = (next_level_size as f32) / (curr_level_size as f32);
89
90                Some((window, curr_level_pull_in, write_amp))
91            })
92            // Find the compaction with the smallest write amplification factor
93            .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
94            .map(|(window, curr_level_pull_in, _)| {
95                let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
96                ids.extend(curr_level_pull_in.iter().map(Table::id));
97                (ids, false)
98            })
99    } else {
100        None
101    }
102}
103
104/// Levelled compaction strategy (LCS)
105///
106/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
107///
108/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
109///
110/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
111///
112/// LCS is the recommended compaction strategy to use.
113///
114/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
115#[derive(Clone)]
116pub struct Strategy {
117    /// When the number of tables in L0 reaches this threshold,
118    /// they are merged into L1.
119    ///
120    /// Default = 4
121    ///
122    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
123    pub l0_threshold: u8,
124
125    /// The target table size as disk (possibly compressed).
126    ///
127    /// Default = 64 MiB
128    ///
129    /// Same as `target_file_size_base` in `RocksDB`.
130    pub target_size: u64,
131
132    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
133    ///
134    /// This is the exponential growth of the from one.
135    /// level to the next.
136    ///
137    /// Default = 10
138    pub level_ratio_policy: Vec<f32>,
139}
140
141impl Default for Strategy {
142    fn default() -> Self {
143        Self {
144            l0_threshold: 4,
145            target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
146            level_ratio_policy: vec![10.0],
147        }
148    }
149}
150
151impl Strategy {
152    /// Sets the growth ratio between levels.
153    #[must_use]
154    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
155        self.level_ratio_policy = policy;
156        self
157    }
158
159    /// Calculates the size of L1.
160    fn level_base_size(&self) -> u64 {
161        self.target_size * u64::from(self.l0_threshold)
162    }
163
164    /// Calculates the level target size.
165    ///
166    /// L1 = `level_base_size`
167    ///
168    /// L2 = `level_base_size * ratio`
169    ///
170    /// L3 = `level_base_size * ratio * ratio`
171    ///
172    /// ...
173    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
174        assert!(
175            canonical_level_idx >= 1,
176            "level_target_size does not apply to L0",
177        );
178
179        if canonical_level_idx == 1 {
180            // u64::from(self.target_size)
181            self.level_base_size()
182        } else {
183            let mut size = self.level_base_size() as f32;
184
185            // NOTE: Minus 2 because |{L0, L1}|
186            for idx in 0..=(canonical_level_idx - 2) {
187                let ratio = self
188                    .level_ratio_policy
189                    .get(usize::from(idx))
190                    .copied()
191                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
192
193                size *= ratio;
194            }
195
196            size as u64
197        }
198    }
199}
200
201impl CompactionStrategy for Strategy {
202    fn get_name(&self) -> &'static str {
203        NAME
204    }
205
206    fn get_config(&self) -> Vec<crate::KvPair> {
207        vec![
208            (
209                crate::UserKey::from("leveled_l0_threshold"),
210                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
211            ),
212            (
213                crate::UserKey::from("leveled_target_size"),
214                crate::UserValue::from(self.target_size.to_le_bytes()),
215            ),
216            (
217                crate::UserKey::from("leveled_level_ratio_policy"),
218                crate::UserValue::from({
219                    use byteorder::{LittleEndian, WriteBytesExt};
220
221                    let mut v = vec![];
222
223                    v.write_u8(self.level_ratio_policy.len() as u8)
224                        .expect("cannot fail");
225
226                    for &f in &self.level_ratio_policy {
227                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
228                    }
229
230                    v
231                }),
232            ),
233        ]
234    }
235
236    #[expect(clippy::too_many_lines)]
237    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
238        assert!(version.level_count() == 7, "should have exactly 7 levels");
239
240        // Find the level that corresponds to L1
241        #[expect(clippy::map_unwrap_or)]
242        let mut canonical_l1_idx = version
243            .iter_levels()
244            .enumerate()
245            .skip(1)
246            .find(|(_, lvl)| !lvl.is_empty())
247            .map(|(idx, _)| idx)
248            .unwrap_or_else(|| version.level_count() - 1);
249
250        // Number of levels we have to shift to get from the actual level idx to the canonical
251        let mut level_shift = canonical_l1_idx - 1;
252
253        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
254            let need_new_l1 = version
255                .iter_levels()
256                .enumerate()
257                .skip(1)
258                .filter(|(_, lvl)| !lvl.is_empty())
259                .all(|(idx, level)| {
260                    let level_size = level
261                        .iter()
262                        .flat_map(|x| x.iter())
263                        // NOTE: Take bytes that are already being compacted into account,
264                        // otherwise we may be overcompensating
265                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
266                        .map(Table::file_size)
267                        .sum::<u64>();
268
269                    let target_size = self.level_target_size((idx - level_shift) as u8);
270
271                    level_size > target_size
272                });
273
274            // Move up L1 one level if all current levels are at capacity
275            if need_new_l1 {
276                canonical_l1_idx -= 1;
277                level_shift -= 1;
278            }
279        }
280
281        // Trivial move into L1
282        'trivial: {
283            let first_level = version.l0();
284
285            if first_level.run_count() == 1 {
286                if version.level_is_busy(0, state.hidden_set())
287                    || version.level_is_busy(canonical_l1_idx, state.hidden_set())
288                {
289                    break 'trivial;
290                }
291
292                let Some(target_level) = &version.level(canonical_l1_idx) else {
293                    break 'trivial;
294                };
295
296                if target_level.run_count() != 1 {
297                    break 'trivial;
298                }
299
300                let key_range = first_level.aggregate_key_range();
301
302                // Get overlapping tables in next level
303                let get_overlapping = target_level
304                    .iter()
305                    .flat_map(|run| run.get_overlapping(&key_range))
306                    .map(Table::id)
307                    .next();
308
309                if get_overlapping.is_none() && first_level.is_disjoint() {
310                    return Choice::Move(CompactionInput {
311                        table_ids: first_level.list_ids(),
312                        dest_level: canonical_l1_idx as u8,
313                        canonical_level: 1,
314                        target_size: self.target_size,
315                    });
316                }
317            }
318        }
319
320        // Scoring
321        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
322
323        {
324            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
325            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
326            // decisions can prioritize tables that would free the most reclaimable values.
327
328            // Score first level
329
330            let first_level = version.l0();
331
332            // TODO: use run_count instead? but be careful because of version free list GC thingy
333            if first_level.table_count() >= usize::from(self.l0_threshold) {
334                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
335                scores[0] = (ratio, 0);
336            }
337
338            // Score L1+
339            for (idx, level) in version.iter_levels().enumerate().skip(1) {
340                if level.is_empty() {
341                    continue;
342                }
343
344                let level_size = level
345                    .iter()
346                    .flat_map(|x| x.iter())
347                    // NOTE: Take bytes that are already being compacted into account,
348                    // otherwise we may be overcompensating
349                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
350                    .map(Table::file_size)
351                    .sum::<u64>();
352
353                let target_size = self.level_target_size((idx - level_shift) as u8);
354
355                // NOTE: We check for level length above
356                #[expect(clippy::indexing_slicing)]
357                if level_size > target_size {
358                    scores[idx] = (
359                        level_size as f64 / target_size as f64,
360                        level_size - target_size,
361                    );
362
363                    // NOTE: Force a trivial move
364                    if version
365                        .level(idx + 1)
366                        .is_some_and(|next_level| next_level.is_empty())
367                    {
368                        scores[idx] = (99.99, 999);
369                    }
370                }
371            }
372
373            // NOTE: Never score Lmax
374            {
375                scores[6] = (0.0, 0);
376            }
377        }
378
379        // Choose compaction
380        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
381            .into_iter()
382            .enumerate()
383            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
384                score_a
385                    .partial_cmp(score_b)
386                    .unwrap_or(std::cmp::Ordering::Equal)
387            })
388            .expect("should have highest score somewhere");
389
390        if score < 1.0 {
391            return Choice::DoNothing;
392        }
393
394        // We choose L0->L1 compaction
395        if level_idx_with_highest_score == 0 {
396            let Some(first_level) = version.level(0) else {
397                return Choice::DoNothing;
398            };
399
400            if version.level_is_busy(0, state.hidden_set())
401                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
402            {
403                return Choice::DoNothing;
404            }
405
406            let Some(target_level) = &version.level(canonical_l1_idx) else {
407                return Choice::DoNothing;
408            };
409
410            let mut table_ids = first_level.list_ids();
411
412            let key_range = first_level.aggregate_key_range();
413
414            // Get overlapping tables in next level
415            let target_level_overlapping_table_ids: Vec<_> = target_level
416                .iter()
417                .flat_map(|run| run.get_overlapping(&key_range))
418                .map(Table::id)
419                .collect();
420
421            table_ids.extend(&target_level_overlapping_table_ids);
422
423            let choice = CompactionInput {
424                table_ids,
425                dest_level: canonical_l1_idx as u8,
426                canonical_level: 1,
427                target_size: self.target_size,
428            };
429
430            /* eprintln!(
431                "merge {} tables, L0->L1: {:?}",
432                choice.segment_ids.len(),
433                choice.segment_ids,
434            ); */
435
436            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
437                return Choice::Move(choice);
438            }
439            return Choice::Merge(choice);
440        }
441
442        // We choose L1+ compaction instead
443
444        // NOTE: Level count is 255 max
445        #[expect(clippy::cast_possible_truncation)]
446        let curr_level_index = level_idx_with_highest_score as u8;
447
448        let next_level_index = curr_level_index + 1;
449
450        let Some(level) = version.level(level_idx_with_highest_score) else {
451            return Choice::DoNothing;
452        };
453
454        let Some(next_level) = version.level(next_level_index as usize) else {
455            return Choice::DoNothing;
456        };
457
458        debug_assert!(level.is_disjoint(), "level should be disjoint");
459        debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
460
461        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
462            level.first_run().expect("should have exactly one run"),
463            next_level.first_run().map(std::ops::Deref::deref),
464            state.hidden_set(),
465            overshoot_bytes,
466            self.target_size,
467        ) else {
468            return Choice::DoNothing;
469        };
470
471        let choice = CompactionInput {
472            table_ids,
473            dest_level: next_level_index,
474            canonical_level: next_level_index - (level_shift as u8),
475            target_size: self.target_size,
476        };
477
478        /* eprintln!(
479            "{} {} tables, L{}->L{next_level_index}: {:?}",
480            if can_trivial_move { "move" } else { "merge" },
481            choice.segment_ids.len(),
482            next_level_index - 1,
483            choice.segment_ids,
484        ); */
485
486        if can_trivial_move && level.is_disjoint() {
487            return Choice::Move(choice);
488        }
489        Choice::Merge(choice)
490    }
491}
492
493/*
494#[cfg(test)]
495mod tests {
496    use super::{Choice, Strategy};
497    use crate::{
498        cache::Cache,
499        compaction::{CompactionStrategy, Input as CompactionInput},
500        descriptor_table::FileDescriptorTable,
501        level_manifest::LevelManifest,
502        segment::{
503            block::offset::BlockOffset,
504            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
505            file_offsets::FileOffsets,
506            meta::{Metadata, SegmentId},
507            SegmentInner,
508        },
509        super_segment::Segment,
510        time::unix_timestamp,
511        Config, HashSet, KeyRange,
512    };
513    use std::{
514        path::Path,
515        sync::{atomic::AtomicBool, Arc},
516    };
517    use test_log::test;
518
519    fn string_key_range(a: &str, b: &str) -> KeyRange {
520        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
521    }
522
523    #[allow(
524        clippy::expect_used,
525        clippy::cast_possible_truncation,
526        clippy::cast_sign_loss
527    )]
528    fn fixture_segment(
529        id: SegmentId,
530        key_range: KeyRange,
531        size: u64,
532        tombstone_ratio: f32,
533    ) -> Segment {
534        todo!()
535
536        /*   let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
537
538        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
539        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
540
541        SegmentInner {
542            tree_id: 0,
543            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
544            block_index,
545
546            offsets: FileOffsets {
547                bloom_ptr: BlockOffset(0),
548                range_filter_ptr: BlockOffset(0),
549                index_block_ptr: BlockOffset(0),
550                metadata_ptr: BlockOffset(0),
551                range_tombstones_ptr: BlockOffset(0),
552                tli_ptr: BlockOffset(0),
553                pfx_ptr: BlockOffset(0),
554            },
555
556            metadata: Metadata {
557                data_block_count: 0,
558                index_block_count: 0,
559                data_block_size: 4_096,
560                index_block_size: 4_096,
561                created_at: unix_timestamp().as_nanos(),
562                id,
563                file_size: size,
564                compression: crate::segment::meta::CompressionType::None,
565                table_type: crate::segment::meta::TableType::Block,
566                item_count: 1_000_000,
567                key_count: 0,
568                key_range,
569                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
570                range_tombstone_count: 0,
571                uncompressed_size: 0,
572                seqnos: (0, 0),
573            },
574            cache,
575
576            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
577
578            path: "a".into(),
579            is_deleted: AtomicBool::default(),
580        }
581        .into() */
582    }
583
584    #[allow(clippy::expect_used)]
585    fn build_levels(
586        path: &Path,
587        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
588    ) -> crate::Result<LevelManifest> {
589        let mut levels = LevelManifest::create_new(
590            recipe.len().try_into().expect("oopsie"),
591            path.join("levels"),
592        )?;
593
594        for (idx, level) in recipe.into_iter().enumerate() {
595            for (id, min, max, size_mib) in level {
596                levels.insert_into_level(
597                    idx.try_into().expect("oopsie"),
598                    fixture_segment(
599                        id,
600                        string_key_range(min, max),
601                        size_mib * 1_024 * 1_024,
602                        0.0,
603                    ),
604                );
605            }
606        }
607
608        Ok(levels)
609    }
610
611    #[test]
612    fn leveled_empty_levels() -> crate::Result<()> {
613        let tempdir = tempfile::tempdir()?;
614        let compactor = Strategy::default();
615
616        #[rustfmt::skip]
617        let levels = build_levels(tempdir.path(), vec![
618            vec![],
619            vec![],
620            vec![],
621            vec![],
622        ])?;
623
624        assert_eq!(
625            compactor.choose(&levels, &Config::default()),
626            Choice::DoNothing
627        );
628
629        Ok(())
630    }
631
632    #[test]
633    fn leveled_default_l0() -> crate::Result<()> {
634        let tempdir = tempfile::tempdir()?;
635        let compactor = Strategy {
636            target_size: 64 * 1_024 * 1_024,
637            ..Default::default()
638        };
639
640        #[rustfmt::skip]
641        let mut levels = build_levels(tempdir.path(), vec![
642            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
643            vec![],
644            vec![],
645            vec![],
646        ])?;
647
648        assert_eq!(
649            compactor.choose(&levels, &Config::default()),
650            Choice::Merge(CompactionInput {
651                dest_level: 1,
652                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
653                target_size: 64 * 1_024 * 1_024
654            })
655        );
656
657        levels.hide_segments(std::iter::once(4));
658
659        assert_eq!(
660            compactor.choose(&levels, &Config::default()),
661            Choice::DoNothing
662        );
663
664        Ok(())
665    }
666
667    #[test]
668    #[allow(
669        clippy::cast_sign_loss,
670        clippy::cast_precision_loss,
671        clippy::cast_possible_truncation
672    )]
673    fn leveled_intra_l0() -> crate::Result<()> {
674        let tempdir = tempfile::tempdir()?;
675        let compactor = Strategy {
676            target_size: 64 * 1_024 * 1_024,
677            ..Default::default()
678        };
679
680        #[rustfmt::skip]
681        let mut levels = build_levels(tempdir.path(), vec![
682            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
683            vec![],
684            vec![],
685            vec![],
686        ])?;
687
688        assert_eq!(
689            compactor.choose(&levels, &Config::default()),
690            Choice::Merge(CompactionInput {
691                dest_level: 0,
692                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
693                target_size: u64::from(compactor.target_size),
694            })
695        );
696
697        levels.hide_segments(std::iter::once(4));
698
699        assert_eq!(
700            compactor.choose(&levels, &Config::default()),
701            Choice::DoNothing
702        );
703
704        Ok(())
705    }
706
707    #[test]
708    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
709        let tempdir = tempfile::tempdir()?;
710        let compactor = Strategy {
711            target_size: 64 * 1_024 * 1_024,
712            ..Default::default()
713        };
714
715        #[rustfmt::skip]
716        let levels = build_levels(tempdir.path(), vec![
717            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
718            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
719            vec![],
720            vec![],
721        ])?;
722
723        assert_eq!(
724            compactor.choose(&levels, &Config::default()),
725            Choice::Merge(CompactionInput {
726                dest_level: 1,
727                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
728                target_size: 64 * 1_024 * 1_024
729            })
730        );
731
732        Ok(())
733    }
734
735    #[test]
736    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
737        let tempdir = tempfile::tempdir()?;
738        let compactor = Strategy {
739            target_size: 64 * 1_024 * 1_024,
740            ..Default::default()
741        };
742
743        #[rustfmt::skip]
744        let mut levels = build_levels(tempdir.path(), vec![
745            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
746            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
747            vec![],
748            vec![],
749        ])?;
750
751        assert_eq!(
752            compactor.choose(&levels, &Config::default()),
753            Choice::Merge(CompactionInput {
754                dest_level: 1,
755                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
756                target_size: 64 * 1_024 * 1_024
757            })
758        );
759
760        levels.hide_segments(std::iter::once(5));
761        assert_eq!(
762            compactor.choose(&levels, &Config::default()),
763            Choice::DoNothing
764        );
765
766        Ok(())
767    }
768
769    #[test]
770    fn levelled_from_tiered() -> crate::Result<()> {
771        let tempdir = tempfile::tempdir()?;
772        let compactor = Strategy {
773            target_size: 64 * 1_024 * 1_024,
774            ..Default::default()
775        };
776        let config = Config::default();
777
778        #[rustfmt::skip]
779        let levels = build_levels(tempdir.path(), vec![
780            vec![],
781            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
782            vec![(4, "a", "g", 64)],
783            vec![],
784        ])?;
785
786        assert_eq!(
787            compactor.choose(&levels, &config),
788            Choice::Merge(CompactionInput {
789                dest_level: 2,
790                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
791                target_size: 64 * 1_024 * 1_024
792            })
793        );
794
795        Ok(())
796    }
797}
798 */