lsm_tree/compaction/
leveled.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7    compaction::state::{hidden_set::HiddenSet, CompactionState},
8    config::Config,
9    slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
10    table::Table,
11    version::{run::Ranged, Run, Version},
12    HashSet, KeyRange, TableId,
13};
14
15pub fn aggregate_run_key_range(tables: &[Table]) -> KeyRange {
16    let lo = tables.first().expect("run should never be empty");
17    let hi = tables.last().expect("run should never be empty");
18    KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
19}
20
21/// Tries to find the most optimal compaction set from one level into the other.
22fn pick_minimal_compaction(
23    curr_run: &Run<Table>,
24    next_run: Option<&Run<Table>>,
25    hidden_set: &HiddenSet,
26    overshoot: u64,
27    table_base_size: u64,
28) -> Option<(HashSet<TableId>, bool)> {
29    // NOTE: Find largest trivial move (if it exists)
30    if let Some(window) = curr_run.shrinking_windows().find(|window| {
31        if hidden_set.is_blocked(window.iter().map(Table::id)) {
32            // IMPORTANT: Compaction is blocked because of other
33            // on-going compaction
34            return false;
35        }
36
37        let Some(next_run) = &next_run else {
38            // No run in next level, so we can trivially move
39            return true;
40        };
41
42        let key_range = aggregate_run_key_range(window);
43
44        next_run.get_overlapping(&key_range).is_empty()
45    }) {
46        let ids = window.iter().map(Table::id).collect();
47        return Some((ids, true));
48    }
49
50    // NOTE: Look for merges
51    if let Some(next_run) = &next_run {
52        next_run
53            .growing_windows()
54            .take_while(|window| {
55                // Cap at 50x tables per compaction for now
56                //
57                // At this point, all compactions are too large anyway
58                // so we can escape early
59                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
60                next_level_size <= (50 * table_base_size)
61            })
62            .filter_map(|window| {
63                if hidden_set.is_blocked(window.iter().map(Table::id)) {
64                    // IMPORTANT: Compaction is blocked because of other
65                    // on-going compaction
66                    return None;
67                }
68
69                let key_range = aggregate_run_key_range(window);
70
71                // Pull in all contained tables in current level into compaction
72                let curr_level_pull_in = curr_run.get_contained(&key_range);
73
74                let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
75
76                // if curr_level_size < overshoot {
77                //     return None;
78                // }
79
80                if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
81                    // IMPORTANT: Compaction is blocked because of other
82                    // on-going compaction
83                    return None;
84                }
85
86                let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
87
88                //  let compaction_bytes = curr_level_size + next_level_size;
89
90                #[expect(clippy::cast_precision_loss)]
91                let write_amp = (next_level_size as f32) / (curr_level_size as f32);
92
93                Some((window, curr_level_pull_in, write_amp))
94            })
95            // Find the compaction with the smallest write amplification factor
96            .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
97            .map(|(window, curr_level_pull_in, _)| {
98                let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
99                ids.extend(curr_level_pull_in.iter().map(Table::id));
100                (ids, false)
101            })
102    } else {
103        None
104    }
105}
106
107/// Levelled compaction strategy (LCS)
108///
109/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
110///
111/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
112///
113/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
114///
115/// LCS is the recommended compaction strategy to use.
116///
117/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
118#[derive(Clone)]
119pub struct Strategy {
120    /// When the number of tables in L0 reaches this threshold,
121    /// they are merged into L1.
122    ///
123    /// Default = 4
124    ///
125    /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
126    pub l0_threshold: u8,
127
128    /// The target table size as disk (possibly compressed).
129    ///
130    /// Default = 64 MiB
131    ///
132    /// Same as `target_file_size_base` in `RocksDB`.
133    pub target_size: u64,
134
135    /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
136    ///
137    /// This is the exponential growth of the from one.
138    /// level to the next.
139    ///
140    /// Default = 10
141    pub level_ratio_policy: Vec<f32>,
142}
143
144impl Default for Strategy {
145    fn default() -> Self {
146        Self {
147            l0_threshold: 4,
148            target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
149            level_ratio_policy: vec![10.0],
150        }
151    }
152}
153
154impl Strategy {
155    /// Sets the growth ratio between levels.
156    #[must_use]
157    pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
158        self.level_ratio_policy = policy;
159        self
160    }
161
162    /// Calculates the size of L1.
163    fn level_base_size(&self) -> u64 {
164        self.target_size * u64::from(self.l0_threshold)
165    }
166
167    /// Calculates the level target size.
168    ///
169    /// L1 = `level_base_size`
170    ///
171    /// L2 = `level_base_size * ratio`
172    ///
173    /// L3 = `level_base_size * ratio * ratio`
174    ///
175    /// ...
176    fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
177        assert!(
178            canonical_level_idx >= 1,
179            "level_target_size does not apply to L0",
180        );
181
182        if canonical_level_idx == 1 {
183            // u64::from(self.target_size)
184            self.level_base_size()
185        } else {
186            let mut size = self.level_base_size() as f32;
187
188            // NOTE: Minus 2 because |{L0, L1}|
189            for idx in 0..=(canonical_level_idx - 2) {
190                let ratio = self
191                    .level_ratio_policy
192                    .get(usize::from(idx))
193                    .copied()
194                    .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
195
196                size *= ratio;
197            }
198
199            size as u64
200        }
201    }
202}
203
204impl CompactionStrategy for Strategy {
205    fn get_name(&self) -> &'static str {
206        "LeveledCompaction"
207    }
208
209    fn get_config(&self) -> Vec<crate::KvPair> {
210        vec![
211            (
212                crate::UserKey::from("leveled_l0_threshold"),
213                crate::UserValue::from(self.l0_threshold.to_le_bytes()),
214            ),
215            (
216                crate::UserKey::from("leveled_target_size"),
217                crate::UserValue::from(self.target_size.to_le_bytes()),
218            ),
219            (
220                crate::UserKey::from("leveled_level_ratio_policy"),
221                crate::UserValue::from({
222                    use byteorder::{LittleEndian, WriteBytesExt};
223
224                    let mut v = vec![];
225
226                    v.write_u8(self.level_ratio_policy.len() as u8)
227                        .expect("cannot fail");
228
229                    for &f in &self.level_ratio_policy {
230                        v.write_f32::<LittleEndian>(f).expect("cannot fail");
231                    }
232
233                    v
234                }),
235            ),
236        ]
237    }
238
239    #[expect(clippy::too_many_lines)]
240    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
241        assert!(version.level_count() == 7, "should have exactly 7 levels");
242
243        // Find the level that corresponds to L1
244        #[expect(clippy::map_unwrap_or)]
245        let mut canonical_l1_idx = version
246            .iter_levels()
247            .enumerate()
248            .skip(1)
249            .find(|(_, lvl)| !lvl.is_empty())
250            .map(|(idx, _)| idx)
251            .unwrap_or_else(|| version.level_count() - 1);
252
253        // Number of levels we have to shift to get from the actual level idx to the canonical
254        let mut level_shift = canonical_l1_idx - 1;
255
256        if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
257            let need_new_l1 = version
258                .iter_levels()
259                .enumerate()
260                .skip(1)
261                .filter(|(_, lvl)| !lvl.is_empty())
262                .all(|(idx, level)| {
263                    let level_size = level
264                        .iter()
265                        .flat_map(|x| x.iter())
266                        // NOTE: Take bytes that are already being compacted into account,
267                        // otherwise we may be overcompensating
268                        .filter(|x| !state.hidden_set().is_hidden(x.id()))
269                        .map(Table::file_size)
270                        .sum::<u64>();
271
272                    let target_size = self.level_target_size((idx - level_shift) as u8);
273
274                    level_size > target_size
275                });
276
277            // Move up L1 one level if all current levels are at capacity
278            if need_new_l1 {
279                canonical_l1_idx -= 1;
280                level_shift -= 1;
281            }
282        }
283
284        // Trivial move into L1
285        'trivial: {
286            let first_level = version.l0();
287
288            if first_level.run_count() == 1 {
289                if version.level_is_busy(0, state.hidden_set())
290                    || version.level_is_busy(canonical_l1_idx, state.hidden_set())
291                {
292                    break 'trivial;
293                }
294
295                let Some(target_level) = &version.level(canonical_l1_idx) else {
296                    break 'trivial;
297                };
298
299                if target_level.run_count() != 1 {
300                    break 'trivial;
301                }
302
303                let key_range = first_level.aggregate_key_range();
304
305                // Get overlapping tables in next level
306                let get_overlapping = target_level
307                    .iter()
308                    .flat_map(|run| run.get_overlapping(&key_range))
309                    .map(Table::id)
310                    .next();
311
312                if get_overlapping.is_none() && first_level.is_disjoint() {
313                    return Choice::Move(CompactionInput {
314                        table_ids: first_level.list_ids(),
315                        dest_level: canonical_l1_idx as u8,
316                        canonical_level: 1,
317                        target_size: self.target_size,
318                    });
319                }
320            }
321        }
322
323        // Scoring
324        let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
325
326        {
327            // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
328            // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
329            // decisions can prioritize tables that would free the most reclaimable values.
330
331            // Score first level
332
333            let first_level = version.l0();
334
335            // TODO: use run_count instead? but be careful because of version free list GC thingy
336            if first_level.table_count() >= usize::from(self.l0_threshold) {
337                let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
338                scores[0] = (ratio, 0);
339            }
340
341            // Score L1+
342            for (idx, level) in version.iter_levels().enumerate().skip(1) {
343                if level.is_empty() {
344                    continue;
345                }
346
347                let level_size = level
348                    .iter()
349                    .flat_map(|x| x.iter())
350                    // NOTE: Take bytes that are already being compacted into account,
351                    // otherwise we may be overcompensating
352                    .filter(|x| !state.hidden_set().is_hidden(x.id()))
353                    .map(Table::file_size)
354                    .sum::<u64>();
355
356                let target_size = self.level_target_size((idx - level_shift) as u8);
357
358                // NOTE: We check for level length above
359                #[expect(clippy::indexing_slicing)]
360                if level_size > target_size {
361                    scores[idx] = (
362                        level_size as f64 / target_size as f64,
363                        level_size - target_size,
364                    );
365
366                    // NOTE: Force a trivial move
367                    if version
368                        .level(idx + 1)
369                        .is_some_and(|next_level| next_level.is_empty())
370                    {
371                        scores[idx] = (99.99, 999);
372                    }
373                }
374            }
375
376            // NOTE: Never score Lmax
377            //
378            // NOTE: We check for level length above
379            #[expect(clippy::indexing_slicing)]
380            {
381                scores[6] = (0.0, 0);
382            }
383        }
384
385        // Choose compaction
386        let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
387            .into_iter()
388            .enumerate()
389            .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
390                score_a
391                    .partial_cmp(score_b)
392                    .unwrap_or(std::cmp::Ordering::Equal)
393            })
394            .expect("should have highest score somewhere");
395
396        if score < 1.0 {
397            return Choice::DoNothing;
398        }
399
400        // We choose L0->L1 compaction
401        if level_idx_with_highest_score == 0 {
402            let Some(first_level) = version.level(0) else {
403                return Choice::DoNothing;
404            };
405
406            if version.level_is_busy(0, state.hidden_set())
407                || version.level_is_busy(canonical_l1_idx, state.hidden_set())
408            {
409                return Choice::DoNothing;
410            }
411
412            let Some(target_level) = &version.level(canonical_l1_idx) else {
413                return Choice::DoNothing;
414            };
415
416            let mut table_ids = first_level.list_ids();
417
418            let key_range = first_level.aggregate_key_range();
419
420            // Get overlapping tables in next level
421            let target_level_overlapping_table_ids: Vec<_> = target_level
422                .iter()
423                .flat_map(|run| run.get_overlapping(&key_range))
424                .map(Table::id)
425                .collect();
426
427            table_ids.extend(&target_level_overlapping_table_ids);
428
429            let choice = CompactionInput {
430                table_ids,
431                dest_level: canonical_l1_idx as u8,
432                canonical_level: 1,
433                target_size: self.target_size,
434            };
435
436            /* eprintln!(
437                "merge {} tables, L0->L1: {:?}",
438                choice.segment_ids.len(),
439                choice.segment_ids,
440            ); */
441
442            if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
443                return Choice::Move(choice);
444            }
445            return Choice::Merge(choice);
446        }
447
448        // We choose L1+ compaction instead
449
450        // NOTE: Level count is 255 max
451        #[expect(clippy::cast_possible_truncation)]
452        let curr_level_index = level_idx_with_highest_score as u8;
453
454        let next_level_index = curr_level_index + 1;
455
456        let Some(level) = version.level(level_idx_with_highest_score) else {
457            return Choice::DoNothing;
458        };
459
460        let Some(next_level) = version.level(next_level_index as usize) else {
461            return Choice::DoNothing;
462        };
463
464        debug_assert!(level.is_disjoint(), "level should be disjoint");
465        debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
466
467        let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
468            level.first_run().expect("should have exactly one run"),
469            next_level.first_run().map(std::ops::Deref::deref),
470            state.hidden_set(),
471            overshoot_bytes,
472            self.target_size,
473        ) else {
474            return Choice::DoNothing;
475        };
476
477        let choice = CompactionInput {
478            table_ids,
479            dest_level: next_level_index,
480            canonical_level: next_level_index - (level_shift as u8),
481            target_size: self.target_size,
482        };
483
484        /* eprintln!(
485            "{} {} tables, L{}->L{next_level_index}: {:?}",
486            if can_trivial_move { "move" } else { "merge" },
487            choice.segment_ids.len(),
488            next_level_index - 1,
489            choice.segment_ids,
490        ); */
491
492        if can_trivial_move && level.is_disjoint() {
493            return Choice::Move(choice);
494        }
495        Choice::Merge(choice)
496    }
497}
498
499/*
500#[cfg(test)]
501mod tests {
502    use super::{Choice, Strategy};
503    use crate::{
504        cache::Cache,
505        compaction::{CompactionStrategy, Input as CompactionInput},
506        descriptor_table::FileDescriptorTable,
507        level_manifest::LevelManifest,
508        segment::{
509            block::offset::BlockOffset,
510            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
511            file_offsets::FileOffsets,
512            meta::{Metadata, SegmentId},
513            SegmentInner,
514        },
515        super_segment::Segment,
516        time::unix_timestamp,
517        Config, HashSet, KeyRange,
518    };
519    use std::{
520        path::Path,
521        sync::{atomic::AtomicBool, Arc},
522    };
523    use test_log::test;
524
525    fn string_key_range(a: &str, b: &str) -> KeyRange {
526        KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
527    }
528
529    #[allow(
530        clippy::expect_used,
531        clippy::cast_possible_truncation,
532        clippy::cast_sign_loss
533    )]
534    fn fixture_segment(
535        id: SegmentId,
536        key_range: KeyRange,
537        size: u64,
538        tombstone_ratio: f32,
539    ) -> Segment {
540        todo!()
541
542        /*   let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
543
544        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
545        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
546
547        SegmentInner {
548            tree_id: 0,
549            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
550            block_index,
551
552            offsets: FileOffsets {
553                bloom_ptr: BlockOffset(0),
554                range_filter_ptr: BlockOffset(0),
555                index_block_ptr: BlockOffset(0),
556                metadata_ptr: BlockOffset(0),
557                range_tombstones_ptr: BlockOffset(0),
558                tli_ptr: BlockOffset(0),
559                pfx_ptr: BlockOffset(0),
560            },
561
562            metadata: Metadata {
563                data_block_count: 0,
564                index_block_count: 0,
565                data_block_size: 4_096,
566                index_block_size: 4_096,
567                created_at: unix_timestamp().as_nanos(),
568                id,
569                file_size: size,
570                compression: crate::segment::meta::CompressionType::None,
571                table_type: crate::segment::meta::TableType::Block,
572                item_count: 1_000_000,
573                key_count: 0,
574                key_range,
575                tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
576                range_tombstone_count: 0,
577                uncompressed_size: 0,
578                seqnos: (0, 0),
579            },
580            cache,
581
582            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
583
584            path: "a".into(),
585            is_deleted: AtomicBool::default(),
586        }
587        .into() */
588    }
589
590    #[allow(clippy::expect_used)]
591    fn build_levels(
592        path: &Path,
593        recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
594    ) -> crate::Result<LevelManifest> {
595        let mut levels = LevelManifest::create_new(
596            recipe.len().try_into().expect("oopsie"),
597            path.join("levels"),
598        )?;
599
600        for (idx, level) in recipe.into_iter().enumerate() {
601            for (id, min, max, size_mib) in level {
602                levels.insert_into_level(
603                    idx.try_into().expect("oopsie"),
604                    fixture_segment(
605                        id,
606                        string_key_range(min, max),
607                        size_mib * 1_024 * 1_024,
608                        0.0,
609                    ),
610                );
611            }
612        }
613
614        Ok(levels)
615    }
616
617    #[test]
618    fn leveled_empty_levels() -> crate::Result<()> {
619        let tempdir = tempfile::tempdir()?;
620        let compactor = Strategy::default();
621
622        #[rustfmt::skip]
623        let levels = build_levels(tempdir.path(), vec![
624            vec![],
625            vec![],
626            vec![],
627            vec![],
628        ])?;
629
630        assert_eq!(
631            compactor.choose(&levels, &Config::default()),
632            Choice::DoNothing
633        );
634
635        Ok(())
636    }
637
638    #[test]
639    fn leveled_default_l0() -> crate::Result<()> {
640        let tempdir = tempfile::tempdir()?;
641        let compactor = Strategy {
642            target_size: 64 * 1_024 * 1_024,
643            ..Default::default()
644        };
645
646        #[rustfmt::skip]
647        let mut levels = build_levels(tempdir.path(), vec![
648            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
649            vec![],
650            vec![],
651            vec![],
652        ])?;
653
654        assert_eq!(
655            compactor.choose(&levels, &Config::default()),
656            Choice::Merge(CompactionInput {
657                dest_level: 1,
658                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
659                target_size: 64 * 1_024 * 1_024
660            })
661        );
662
663        levels.hide_segments(std::iter::once(4));
664
665        assert_eq!(
666            compactor.choose(&levels, &Config::default()),
667            Choice::DoNothing
668        );
669
670        Ok(())
671    }
672
673    #[test]
674    #[allow(
675        clippy::cast_sign_loss,
676        clippy::cast_precision_loss,
677        clippy::cast_possible_truncation
678    )]
679    fn leveled_intra_l0() -> crate::Result<()> {
680        let tempdir = tempfile::tempdir()?;
681        let compactor = Strategy {
682            target_size: 64 * 1_024 * 1_024,
683            ..Default::default()
684        };
685
686        #[rustfmt::skip]
687        let mut levels = build_levels(tempdir.path(), vec![
688            vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
689            vec![],
690            vec![],
691            vec![],
692        ])?;
693
694        assert_eq!(
695            compactor.choose(&levels, &Config::default()),
696            Choice::Merge(CompactionInput {
697                dest_level: 0,
698                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
699                target_size: u64::from(compactor.target_size),
700            })
701        );
702
703        levels.hide_segments(std::iter::once(4));
704
705        assert_eq!(
706            compactor.choose(&levels, &Config::default()),
707            Choice::DoNothing
708        );
709
710        Ok(())
711    }
712
713    #[test]
714    fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
715        let tempdir = tempfile::tempdir()?;
716        let compactor = Strategy {
717            target_size: 64 * 1_024 * 1_024,
718            ..Default::default()
719        };
720
721        #[rustfmt::skip]
722        let levels = build_levels(tempdir.path(), vec![
723            vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
724            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
725            vec![],
726            vec![],
727        ])?;
728
729        assert_eq!(
730            compactor.choose(&levels, &Config::default()),
731            Choice::Merge(CompactionInput {
732                dest_level: 1,
733                segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
734                target_size: 64 * 1_024 * 1_024
735            })
736        );
737
738        Ok(())
739    }
740
741    #[test]
742    fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
743        let tempdir = tempfile::tempdir()?;
744        let compactor = Strategy {
745            target_size: 64 * 1_024 * 1_024,
746            ..Default::default()
747        };
748
749        #[rustfmt::skip]
750        let mut levels = build_levels(tempdir.path(), vec![
751            vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
752            vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
753            vec![],
754            vec![],
755        ])?;
756
757        assert_eq!(
758            compactor.choose(&levels, &Config::default()),
759            Choice::Merge(CompactionInput {
760                dest_level: 1,
761                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
762                target_size: 64 * 1_024 * 1_024
763            })
764        );
765
766        levels.hide_segments(std::iter::once(5));
767        assert_eq!(
768            compactor.choose(&levels, &Config::default()),
769            Choice::DoNothing
770        );
771
772        Ok(())
773    }
774
775    #[test]
776    fn levelled_from_tiered() -> crate::Result<()> {
777        let tempdir = tempfile::tempdir()?;
778        let compactor = Strategy {
779            target_size: 64 * 1_024 * 1_024,
780            ..Default::default()
781        };
782        let config = Config::default();
783
784        #[rustfmt::skip]
785        let levels = build_levels(tempdir.path(), vec![
786            vec![],
787            vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
788            vec![(4, "a", "g", 64)],
789            vec![],
790        ])?;
791
792        assert_eq!(
793            compactor.choose(&levels, &config),
794            Choice::Merge(CompactionInput {
795                dest_level: 2,
796                segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
797                target_size: 64 * 1_024 * 1_024
798            })
799        );
800
801        Ok(())
802    }
803}
804 */