lsm_tree/compaction/leveled.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 compaction::state::{hidden_set::HiddenSet, CompactionState},
8 config::Config,
9 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
10 table::{util::aggregate_run_key_range, Table},
11 version::{Run, Version},
12 HashSet, TableId,
13};
14
15#[doc(hidden)]
16pub const NAME: &str = "LeveledCompaction";
17
18/// Tries to find the most optimal compaction set from one level into the other.
19fn pick_minimal_compaction(
20 curr_run: &Run<Table>,
21 next_run: Option<&Run<Table>>,
22 hidden_set: &HiddenSet,
23 _overshoot: u64,
24 table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26 // NOTE: Find largest trivial move (if it exists)
27 if let Some(window) = curr_run.shrinking_windows().find(|window| {
28 if hidden_set.is_blocked(window.iter().map(Table::id)) {
29 // IMPORTANT: Compaction is blocked because of other
30 // on-going compaction
31 return false;
32 }
33
34 let Some(next_run) = &next_run else {
35 // No run in next level, so we can trivially move
36 return true;
37 };
38
39 let key_range = aggregate_run_key_range(window);
40
41 next_run.get_overlapping(&key_range).is_empty()
42 }) {
43 let ids = window.iter().map(Table::id).collect();
44 return Some((ids, true));
45 }
46
47 // NOTE: Look for merges
48 if let Some(next_run) = &next_run {
49 next_run
50 .growing_windows()
51 .take_while(|window| {
52 // Cap at 50x tables per compaction for now
53 //
54 // At this point, all compactions are too large anyway
55 // so we can escape early
56 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57 next_level_size <= (50 * table_base_size)
58 })
59 .filter_map(|window| {
60 if hidden_set.is_blocked(window.iter().map(Table::id)) {
61 // IMPORTANT: Compaction is blocked because of other
62 // on-going compaction
63 return None;
64 }
65
66 let key_range = aggregate_run_key_range(window);
67
68 // Pull in all contained tables in current level into compaction
69 let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71 let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73 // if curr_level_size < overshoot {
74 // return None;
75 // }
76
77 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
78 // IMPORTANT: Compaction is blocked because of other
79 // on-going compaction
80 return None;
81 }
82
83 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
84
85 // let compaction_bytes = curr_level_size + next_level_size;
86
87 #[expect(clippy::cast_precision_loss)]
88 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
89
90 Some((window, curr_level_pull_in, write_amp))
91 })
92 // Find the compaction with the smallest write amplification factor
93 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
94 .map(|(window, curr_level_pull_in, _)| {
95 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
96 ids.extend(curr_level_pull_in.iter().map(Table::id));
97 (ids, false)
98 })
99 } else {
100 None
101 }
102}
103
104/// Levelled compaction strategy (LCS)
105///
106/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
107///
108/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
109///
110/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
111///
112/// LCS is the recommended compaction strategy to use.
113///
114/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
115#[derive(Clone)]
116pub struct Strategy {
117 /// When the number of tables in L0 reaches this threshold,
118 /// they are merged into L1.
119 ///
120 /// Default = 4
121 ///
122 /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
123 l0_threshold: u8,
124
125 /// The target table size as disk (possibly compressed).
126 ///
127 /// Default = 64 MiB
128 ///
129 /// Same as `target_file_size_base` in `RocksDB`.
130 target_size: u64,
131
132 /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
133 ///
134 /// This is the exponential growth of the from one.
135 /// level to the next.
136 ///
137 /// Default = 10
138 level_ratio_policy: Vec<f32>,
139}
140
141impl Default for Strategy {
142 fn default() -> Self {
143 Self {
144 l0_threshold: 4,
145 target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
146 level_ratio_policy: vec![10.0],
147 }
148 }
149}
150
151impl Strategy {
152 /// Sets the growth ratio between levels.
153 ///
154 /// Default = 10.0
155 #[must_use]
156 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
157 self.level_ratio_policy = policy;
158 self
159 }
160
161 /// Sets the L0 threshold.
162 ///
163 /// Default = 4
164 #[must_use]
165 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
166 self.l0_threshold = threshold;
167 self
168 }
169
170 /// Sets the table target size.
171 ///
172 /// Default = 64 MiB
173 #[must_use]
174 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
175 self.target_size = bytes;
176 self
177 }
178
179 /// Calculates the size of L1.
180 fn level_base_size(&self) -> u64 {
181 self.target_size * u64::from(self.l0_threshold)
182 }
183
184 /// Calculates the level target size.
185 ///
186 /// L1 = `level_base_size`
187 ///
188 /// L2 = `level_base_size * ratio`
189 ///
190 /// L3 = `level_base_size * ratio * ratio`
191 ///
192 /// ...
193 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
194 assert!(
195 canonical_level_idx >= 1,
196 "level_target_size does not apply to L0",
197 );
198
199 if canonical_level_idx == 1 {
200 // u64::from(self.target_size)
201 self.level_base_size()
202 } else {
203 let mut size = self.level_base_size() as f32;
204
205 // NOTE: Minus 2 because |{L0, L1}|
206 for idx in 0..=(canonical_level_idx - 2) {
207 let ratio = self
208 .level_ratio_policy
209 .get(usize::from(idx))
210 .copied()
211 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
212
213 size *= ratio;
214 }
215
216 size as u64
217 }
218 }
219}
220
221impl CompactionStrategy for Strategy {
222 fn get_name(&self) -> &'static str {
223 NAME
224 }
225
226 fn get_config(&self) -> Vec<crate::KvPair> {
227 vec![
228 (
229 crate::UserKey::from("leveled_l0_threshold"),
230 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
231 ),
232 (
233 crate::UserKey::from("leveled_target_size"),
234 crate::UserValue::from(self.target_size.to_le_bytes()),
235 ),
236 (
237 crate::UserKey::from("leveled_level_ratio_policy"),
238 crate::UserValue::from({
239 use byteorder::{LittleEndian, WriteBytesExt};
240
241 let mut v = vec![];
242
243 v.write_u8(self.level_ratio_policy.len() as u8)
244 .expect("cannot fail");
245
246 for &f in &self.level_ratio_policy {
247 v.write_f32::<LittleEndian>(f).expect("cannot fail");
248 }
249
250 v
251 }),
252 ),
253 ]
254 }
255
256 #[expect(clippy::too_many_lines)]
257 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
258 assert!(version.level_count() == 7, "should have exactly 7 levels");
259
260 // Find the level that corresponds to L1
261 #[expect(clippy::map_unwrap_or)]
262 let mut canonical_l1_idx = version
263 .iter_levels()
264 .enumerate()
265 .skip(1)
266 .find(|(_, lvl)| !lvl.is_empty())
267 .map(|(idx, _)| idx)
268 .unwrap_or_else(|| version.level_count() - 1);
269
270 // Number of levels we have to shift to get from the actual level idx to the canonical
271 let mut level_shift = canonical_l1_idx - 1;
272
273 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
274 let need_new_l1 = version
275 .iter_levels()
276 .enumerate()
277 .skip(1)
278 .filter(|(_, lvl)| !lvl.is_empty())
279 .all(|(idx, level)| {
280 let level_size = level
281 .iter()
282 .flat_map(|x| x.iter())
283 // NOTE: Take bytes that are already being compacted into account,
284 // otherwise we may be overcompensating
285 .filter(|x| !state.hidden_set().is_hidden(x.id()))
286 .map(Table::file_size)
287 .sum::<u64>();
288
289 let target_size = self.level_target_size((idx - level_shift) as u8);
290
291 level_size > target_size
292 });
293
294 // Move up L1 one level if all current levels are at capacity
295 if need_new_l1 {
296 canonical_l1_idx -= 1;
297 level_shift -= 1;
298 }
299 }
300
301 // Trivial move into L1
302 'trivial: {
303 let first_level = version.l0();
304
305 if first_level.run_count() == 1 {
306 if version.level_is_busy(0, state.hidden_set())
307 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
308 {
309 break 'trivial;
310 }
311
312 let Some(target_level) = &version.level(canonical_l1_idx) else {
313 break 'trivial;
314 };
315
316 if target_level.run_count() != 1 {
317 break 'trivial;
318 }
319
320 let key_range = first_level.aggregate_key_range();
321
322 // Get overlapping tables in next level
323 let get_overlapping = target_level
324 .iter()
325 .flat_map(|run| run.get_overlapping(&key_range))
326 .map(Table::id)
327 .next();
328
329 if get_overlapping.is_none() && first_level.is_disjoint() {
330 return Choice::Move(CompactionInput {
331 table_ids: first_level.list_ids(),
332 dest_level: canonical_l1_idx as u8,
333 canonical_level: 1,
334 target_size: self.target_size,
335 });
336 }
337 }
338 }
339
340 // Scoring
341 let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
342
343 {
344 // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
345 // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
346 // decisions can prioritize tables that would free the most reclaimable values.
347
348 // Score first level
349
350 let first_level = version.l0();
351
352 // TODO: use run_count instead? but be careful because of version free list GC thingy
353 if first_level.table_count() >= usize::from(self.l0_threshold) {
354 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
355 scores[0] = (ratio, 0);
356 }
357
358 // Score L1+
359 for (idx, level) in version.iter_levels().enumerate().skip(1) {
360 if level.is_empty() {
361 continue;
362 }
363
364 let level_size = level
365 .iter()
366 .flat_map(|x| x.iter())
367 // NOTE: Take bytes that are already being compacted into account,
368 // otherwise we may be overcompensating
369 .filter(|x| !state.hidden_set().is_hidden(x.id()))
370 .map(Table::file_size)
371 .sum::<u64>();
372
373 let target_size = self.level_target_size((idx - level_shift) as u8);
374
375 // NOTE: We check for level length above
376 #[expect(clippy::indexing_slicing)]
377 if level_size > target_size {
378 scores[idx] = (
379 level_size as f64 / target_size as f64,
380 level_size - target_size,
381 );
382
383 // NOTE: Force a trivial move
384 if version
385 .level(idx + 1)
386 .is_some_and(|next_level| next_level.is_empty())
387 {
388 scores[idx] = (99.99, 999);
389 }
390 }
391 }
392
393 // NOTE: Never score Lmax
394 {
395 scores[6] = (0.0, 0);
396 }
397 }
398
399 // Choose compaction
400 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
401 .into_iter()
402 .enumerate()
403 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
404 score_a
405 .partial_cmp(score_b)
406 .unwrap_or(std::cmp::Ordering::Equal)
407 })
408 .expect("should have highest score somewhere");
409
410 if score < 1.0 {
411 return Choice::DoNothing;
412 }
413
414 // We choose L0->L1 compaction
415 if level_idx_with_highest_score == 0 {
416 let Some(first_level) = version.level(0) else {
417 return Choice::DoNothing;
418 };
419
420 if version.level_is_busy(0, state.hidden_set())
421 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
422 {
423 return Choice::DoNothing;
424 }
425
426 let Some(target_level) = &version.level(canonical_l1_idx) else {
427 return Choice::DoNothing;
428 };
429
430 let mut table_ids = first_level.list_ids();
431
432 let key_range = first_level.aggregate_key_range();
433
434 // Get overlapping tables in next level
435 let target_level_overlapping_table_ids: Vec<_> = target_level
436 .iter()
437 .flat_map(|run| run.get_overlapping(&key_range))
438 .map(Table::id)
439 .collect();
440
441 table_ids.extend(&target_level_overlapping_table_ids);
442
443 let choice = CompactionInput {
444 table_ids,
445 dest_level: canonical_l1_idx as u8,
446 canonical_level: 1,
447 target_size: self.target_size,
448 };
449
450 /* eprintln!(
451 "merge {} tables, L0->L1: {:?}",
452 choice.segment_ids.len(),
453 choice.segment_ids,
454 ); */
455
456 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
457 return Choice::Move(choice);
458 }
459 return Choice::Merge(choice);
460 }
461
462 // We choose L1+ compaction instead
463
464 // NOTE: Level count is 255 max
465 #[expect(clippy::cast_possible_truncation)]
466 let curr_level_index = level_idx_with_highest_score as u8;
467
468 let next_level_index = curr_level_index + 1;
469
470 let Some(level) = version.level(level_idx_with_highest_score) else {
471 return Choice::DoNothing;
472 };
473
474 let Some(next_level) = version.level(next_level_index as usize) else {
475 return Choice::DoNothing;
476 };
477
478 debug_assert!(level.is_disjoint(), "level should be disjoint");
479 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
480
481 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
482 level.first_run().expect("should have exactly one run"),
483 next_level.first_run().map(std::ops::Deref::deref),
484 state.hidden_set(),
485 overshoot_bytes,
486 self.target_size,
487 ) else {
488 return Choice::DoNothing;
489 };
490
491 let choice = CompactionInput {
492 table_ids,
493 dest_level: next_level_index,
494 canonical_level: next_level_index - (level_shift as u8),
495 target_size: self.target_size,
496 };
497
498 /* eprintln!(
499 "{} {} tables, L{}->L{next_level_index}: {:?}",
500 if can_trivial_move { "move" } else { "merge" },
501 choice.segment_ids.len(),
502 next_level_index - 1,
503 choice.segment_ids,
504 ); */
505
506 if can_trivial_move && level.is_disjoint() {
507 return Choice::Move(choice);
508 }
509 Choice::Merge(choice)
510 }
511}
512
513/*
514#[cfg(test)]
515mod tests {
516 use super::{Choice, Strategy};
517 use crate::{
518 cache::Cache,
519 compaction::{CompactionStrategy, Input as CompactionInput},
520 descriptor_table::FileDescriptorTable,
521 level_manifest::LevelManifest,
522 segment::{
523 block::offset::BlockOffset,
524 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
525 file_offsets::FileOffsets,
526 meta::{Metadata, SegmentId},
527 SegmentInner,
528 },
529 super_segment::Segment,
530 time::unix_timestamp,
531 Config, HashSet, KeyRange,
532 };
533 use std::{
534 path::Path,
535 sync::{atomic::AtomicBool, Arc},
536 };
537 use test_log::test;
538
539 fn string_key_range(a: &str, b: &str) -> KeyRange {
540 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
541 }
542
543 #[allow(
544 clippy::expect_used,
545 clippy::cast_possible_truncation,
546 clippy::cast_sign_loss
547 )]
548 fn fixture_segment(
549 id: SegmentId,
550 key_range: KeyRange,
551 size: u64,
552 tombstone_ratio: f32,
553 ) -> Segment {
554 todo!()
555
556 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
557
558 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
559 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
560
561 SegmentInner {
562 tree_id: 0,
563 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
564 block_index,
565
566 offsets: FileOffsets {
567 bloom_ptr: BlockOffset(0),
568 range_filter_ptr: BlockOffset(0),
569 index_block_ptr: BlockOffset(0),
570 metadata_ptr: BlockOffset(0),
571 range_tombstones_ptr: BlockOffset(0),
572 tli_ptr: BlockOffset(0),
573 pfx_ptr: BlockOffset(0),
574 },
575
576 metadata: Metadata {
577 data_block_count: 0,
578 index_block_count: 0,
579 data_block_size: 4_096,
580 index_block_size: 4_096,
581 created_at: unix_timestamp().as_nanos(),
582 id,
583 file_size: size,
584 compression: crate::segment::meta::CompressionType::None,
585 table_type: crate::segment::meta::TableType::Block,
586 item_count: 1_000_000,
587 key_count: 0,
588 key_range,
589 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
590 range_tombstone_count: 0,
591 uncompressed_size: 0,
592 seqnos: (0, 0),
593 },
594 cache,
595
596 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
597
598 path: "a".into(),
599 is_deleted: AtomicBool::default(),
600 }
601 .into() */
602 }
603
604 #[allow(clippy::expect_used)]
605 fn build_levels(
606 path: &Path,
607 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
608 ) -> crate::Result<LevelManifest> {
609 let mut levels = LevelManifest::create_new(
610 recipe.len().try_into().expect("oopsie"),
611 path.join("levels"),
612 )?;
613
614 for (idx, level) in recipe.into_iter().enumerate() {
615 for (id, min, max, size_mib) in level {
616 levels.insert_into_level(
617 idx.try_into().expect("oopsie"),
618 fixture_segment(
619 id,
620 string_key_range(min, max),
621 size_mib * 1_024 * 1_024,
622 0.0,
623 ),
624 );
625 }
626 }
627
628 Ok(levels)
629 }
630
631 #[test]
632 fn leveled_empty_levels() -> crate::Result<()> {
633 let tempdir = tempfile::tempdir()?;
634 let compactor = Strategy::default();
635
636 #[rustfmt::skip]
637 let levels = build_levels(tempdir.path(), vec![
638 vec![],
639 vec![],
640 vec![],
641 vec![],
642 ])?;
643
644 assert_eq!(
645 compactor.choose(&levels, &Config::default()),
646 Choice::DoNothing
647 );
648
649 Ok(())
650 }
651
652 #[test]
653 fn leveled_default_l0() -> crate::Result<()> {
654 let tempdir = tempfile::tempdir()?;
655 let compactor = Strategy {
656 target_size: 64 * 1_024 * 1_024,
657 ..Default::default()
658 };
659
660 #[rustfmt::skip]
661 let mut levels = build_levels(tempdir.path(), vec![
662 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
663 vec![],
664 vec![],
665 vec![],
666 ])?;
667
668 assert_eq!(
669 compactor.choose(&levels, &Config::default()),
670 Choice::Merge(CompactionInput {
671 dest_level: 1,
672 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
673 target_size: 64 * 1_024 * 1_024
674 })
675 );
676
677 levels.hide_segments(std::iter::once(4));
678
679 assert_eq!(
680 compactor.choose(&levels, &Config::default()),
681 Choice::DoNothing
682 );
683
684 Ok(())
685 }
686
687 #[test]
688 #[allow(
689 clippy::cast_sign_loss,
690 clippy::cast_precision_loss,
691 clippy::cast_possible_truncation
692 )]
693 fn leveled_intra_l0() -> crate::Result<()> {
694 let tempdir = tempfile::tempdir()?;
695 let compactor = Strategy {
696 target_size: 64 * 1_024 * 1_024,
697 ..Default::default()
698 };
699
700 #[rustfmt::skip]
701 let mut levels = build_levels(tempdir.path(), vec![
702 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
703 vec![],
704 vec![],
705 vec![],
706 ])?;
707
708 assert_eq!(
709 compactor.choose(&levels, &Config::default()),
710 Choice::Merge(CompactionInput {
711 dest_level: 0,
712 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
713 target_size: u64::from(compactor.target_size),
714 })
715 );
716
717 levels.hide_segments(std::iter::once(4));
718
719 assert_eq!(
720 compactor.choose(&levels, &Config::default()),
721 Choice::DoNothing
722 );
723
724 Ok(())
725 }
726
727 #[test]
728 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
729 let tempdir = tempfile::tempdir()?;
730 let compactor = Strategy {
731 target_size: 64 * 1_024 * 1_024,
732 ..Default::default()
733 };
734
735 #[rustfmt::skip]
736 let levels = build_levels(tempdir.path(), vec![
737 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
738 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
739 vec![],
740 vec![],
741 ])?;
742
743 assert_eq!(
744 compactor.choose(&levels, &Config::default()),
745 Choice::Merge(CompactionInput {
746 dest_level: 1,
747 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
748 target_size: 64 * 1_024 * 1_024
749 })
750 );
751
752 Ok(())
753 }
754
755 #[test]
756 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
757 let tempdir = tempfile::tempdir()?;
758 let compactor = Strategy {
759 target_size: 64 * 1_024 * 1_024,
760 ..Default::default()
761 };
762
763 #[rustfmt::skip]
764 let mut levels = build_levels(tempdir.path(), vec![
765 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
766 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
767 vec![],
768 vec![],
769 ])?;
770
771 assert_eq!(
772 compactor.choose(&levels, &Config::default()),
773 Choice::Merge(CompactionInput {
774 dest_level: 1,
775 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
776 target_size: 64 * 1_024 * 1_024
777 })
778 );
779
780 levels.hide_segments(std::iter::once(5));
781 assert_eq!(
782 compactor.choose(&levels, &Config::default()),
783 Choice::DoNothing
784 );
785
786 Ok(())
787 }
788
789 #[test]
790 fn levelled_from_tiered() -> crate::Result<()> {
791 let tempdir = tempfile::tempdir()?;
792 let compactor = Strategy {
793 target_size: 64 * 1_024 * 1_024,
794 ..Default::default()
795 };
796 let config = Config::default();
797
798 #[rustfmt::skip]
799 let levels = build_levels(tempdir.path(), vec![
800 vec![],
801 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
802 vec![(4, "a", "g", 64)],
803 vec![],
804 ])?;
805
806 assert_eq!(
807 compactor.choose(&levels, &config),
808 Choice::Merge(CompactionInput {
809 dest_level: 2,
810 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
811 target_size: 64 * 1_024 * 1_024
812 })
813 );
814
815 Ok(())
816 }
817}
818 */