lsm_tree/compaction/leveled.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 compaction::state::{hidden_set::HiddenSet, CompactionState},
8 config::Config,
9 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
10 table::{util::aggregate_run_key_range, Table},
11 version::{Run, Version},
12 HashSet, TableId,
13};
14
15#[doc(hidden)]
16pub const NAME: &str = "LeveledCompaction";
17
18/// Tries to find the most optimal compaction set from one level into the other.
19fn pick_minimal_compaction(
20 curr_run: &Run<Table>,
21 next_run: Option<&Run<Table>>,
22 hidden_set: &HiddenSet,
23 _overshoot: u64,
24 table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26 // NOTE: Find largest trivial move (if it exists)
27 if let Some(window) = curr_run.shrinking_windows().find(|window| {
28 if hidden_set.is_blocked(window.iter().map(Table::id)) {
29 // IMPORTANT: Compaction is blocked because of other
30 // on-going compaction
31 return false;
32 }
33
34 let Some(next_run) = &next_run else {
35 // No run in next level, so we can trivially move
36 return true;
37 };
38
39 let key_range = aggregate_run_key_range(window);
40
41 next_run.get_overlapping(&key_range).is_empty()
42 }) {
43 let ids = window.iter().map(Table::id).collect();
44 return Some((ids, true));
45 }
46
47 // NOTE: Look for merges
48 if let Some(next_run) = &next_run {
49 next_run
50 .growing_windows()
51 .take_while(|window| {
52 // Cap at 50x tables per compaction for now
53 //
54 // At this point, all compactions are too large anyway
55 // so we can escape early
56 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57 next_level_size <= (50 * table_base_size)
58 })
59 .filter_map(|window| {
60 if hidden_set.is_blocked(window.iter().map(Table::id)) {
61 // IMPORTANT: Compaction is blocked because of other
62 // on-going compaction
63 return None;
64 }
65
66 let key_range = aggregate_run_key_range(window);
67
68 // Pull in all contained tables in current level into compaction
69 let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71 let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73 // if curr_level_size < overshoot {
74 // return None;
75 // }
76
77 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
78 // IMPORTANT: Compaction is blocked because of other
79 // on-going compaction
80 return None;
81 }
82
83 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
84
85 // let compaction_bytes = curr_level_size + next_level_size;
86
87 #[expect(clippy::cast_precision_loss)]
88 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
89
90 Some((window, curr_level_pull_in, write_amp))
91 })
92 // Find the compaction with the smallest write amplification factor
93 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
94 .map(|(window, curr_level_pull_in, _)| {
95 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
96 ids.extend(curr_level_pull_in.iter().map(Table::id));
97 (ids, false)
98 })
99 } else {
100 None
101 }
102}
103
104/// Leveled compaction strategy (LCS)
105///
106/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
107///
108/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
109///
110/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
111///
112/// LCS is the recommended compaction strategy to use.
113///
114/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
115#[derive(Clone)]
116pub struct Strategy {
117 l0_threshold: u8,
118
119 /// The target table size as disk (possibly compressed).
120 target_size: u64,
121
122 /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
123 level_ratio_policy: Vec<f32>,
124}
125
126impl Default for Strategy {
127 fn default() -> Self {
128 Self {
129 l0_threshold: 4,
130 target_size:/* 64 MiB */ 64 * 1_024 * 1_024,
131 level_ratio_policy: vec![10.0],
132 }
133 }
134}
135
136impl Strategy {
137 /// Sets the growth ratio between levels.
138 ///
139 /// Same as `set_max_bytes_for_level_multiplier` in `RocksDB`.
140 ///
141 /// Default = [10.0]
142 #[must_use]
143 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
144 self.level_ratio_policy = policy;
145 self
146 }
147
148 /// Sets the L0 threshold.
149 ///
150 /// When the number of tables in L0 reaches this threshold,
151 /// they are merged into L1.
152 ///
153 /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
154 ///
155 /// Default = 4
156 #[must_use]
157 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
158 self.l0_threshold = threshold;
159 self
160 }
161
162 /// Sets the table target size on disk (possibly compressed).
163 ///
164 /// Same as `target_file_size_base` in `RocksDB`.
165 ///
166 /// Default = 64 MiB
167 #[must_use]
168 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
169 self.target_size = bytes;
170 self
171 }
172
173 /// Calculates the size of L1.
174 fn level_base_size(&self) -> u64 {
175 self.target_size * u64::from(self.l0_threshold)
176 }
177
178 /// Calculates the level target size.
179 ///
180 /// L1 = `level_base_size`
181 ///
182 /// L2 = `level_base_size * ratio`
183 ///
184 /// L3 = `level_base_size * ratio * ratio`
185 ///
186 /// ...
187 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
188 assert!(
189 canonical_level_idx >= 1,
190 "level_target_size does not apply to L0",
191 );
192
193 if canonical_level_idx == 1 {
194 // u64::from(self.target_size)
195 self.level_base_size()
196 } else {
197 let mut size = self.level_base_size() as f32;
198
199 // NOTE: Minus 2 because |{L0, L1}|
200 for idx in 0..=(canonical_level_idx - 2) {
201 let ratio = self
202 .level_ratio_policy
203 .get(usize::from(idx))
204 .copied()
205 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
206
207 size *= ratio;
208 }
209
210 size as u64
211 }
212 }
213}
214
215impl CompactionStrategy for Strategy {
216 fn get_name(&self) -> &'static str {
217 NAME
218 }
219
220 fn get_config(&self) -> Vec<crate::KvPair> {
221 vec![
222 (
223 crate::UserKey::from("leveled_l0_threshold"),
224 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
225 ),
226 (
227 crate::UserKey::from("leveled_target_size"),
228 crate::UserValue::from(self.target_size.to_le_bytes()),
229 ),
230 (
231 crate::UserKey::from("leveled_level_ratio_policy"),
232 crate::UserValue::from({
233 use byteorder::{LittleEndian, WriteBytesExt};
234
235 let mut v = vec![];
236
237 v.write_u8(self.level_ratio_policy.len() as u8)
238 .expect("cannot fail");
239
240 for &f in &self.level_ratio_policy {
241 v.write_f32::<LittleEndian>(f).expect("cannot fail");
242 }
243
244 v
245 }),
246 ),
247 ]
248 }
249
250 #[expect(clippy::too_many_lines)]
251 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
252 assert!(version.level_count() == 7, "should have exactly 7 levels");
253
254 // Find the level that corresponds to L1
255 #[expect(clippy::map_unwrap_or)]
256 let mut canonical_l1_idx = version
257 .iter_levels()
258 .enumerate()
259 .skip(1)
260 .find(|(_, lvl)| !lvl.is_empty())
261 .map(|(idx, _)| idx)
262 .unwrap_or_else(|| version.level_count() - 1);
263
264 // Number of levels we have to shift to get from the actual level idx to the canonical
265 let mut level_shift = canonical_l1_idx - 1;
266
267 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
268 let need_new_l1 = version
269 .iter_levels()
270 .enumerate()
271 .skip(1)
272 .filter(|(_, lvl)| !lvl.is_empty())
273 .all(|(idx, level)| {
274 let level_size = level
275 .iter()
276 .flat_map(|x| x.iter())
277 // NOTE: Take bytes that are already being compacted into account,
278 // otherwise we may be overcompensating
279 .filter(|x| !state.hidden_set().is_hidden(x.id()))
280 .map(Table::file_size)
281 .sum::<u64>();
282
283 let target_size = self.level_target_size((idx - level_shift) as u8);
284
285 level_size > target_size
286 });
287
288 // Move up L1 one level if all current levels are at capacity
289 if need_new_l1 {
290 canonical_l1_idx -= 1;
291 level_shift -= 1;
292 }
293 }
294
295 // Trivial move into L1
296 'trivial: {
297 let first_level = version.l0();
298
299 if first_level.run_count() == 1 {
300 if version.level_is_busy(0, state.hidden_set())
301 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
302 {
303 break 'trivial;
304 }
305
306 let Some(target_level) = &version.level(canonical_l1_idx) else {
307 break 'trivial;
308 };
309
310 if target_level.run_count() != 1 {
311 break 'trivial;
312 }
313
314 let key_range = first_level.aggregate_key_range();
315
316 // Get overlapping tables in next level
317 let get_overlapping = target_level
318 .iter()
319 .flat_map(|run| run.get_overlapping(&key_range))
320 .map(Table::id)
321 .next();
322
323 if get_overlapping.is_none() && first_level.is_disjoint() {
324 return Choice::Move(CompactionInput {
325 table_ids: first_level.list_ids(),
326 dest_level: canonical_l1_idx as u8,
327 canonical_level: 1,
328 target_size: self.target_size,
329 });
330 }
331 }
332 }
333
334 // Scoring
335 let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
336
337 {
338 // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
339 // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
340 // decisions can prioritize tables that would free the most reclaimable values.
341
342 // Score first level
343
344 let first_level = version.l0();
345
346 // TODO: use run_count instead? but be careful because of version free list GC thingy
347 if first_level.table_count() >= usize::from(self.l0_threshold) {
348 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
349 scores[0] = (ratio, 0);
350 }
351
352 // Score L1+
353 for (idx, level) in version.iter_levels().enumerate().skip(1) {
354 if level.is_empty() {
355 continue;
356 }
357
358 let level_size = level
359 .iter()
360 .flat_map(|x| x.iter())
361 // NOTE: Take bytes that are already being compacted into account,
362 // otherwise we may be overcompensating
363 .filter(|x| !state.hidden_set().is_hidden(x.id()))
364 .map(Table::file_size)
365 .sum::<u64>();
366
367 let target_size = self.level_target_size((idx - level_shift) as u8);
368
369 // NOTE: We check for level length above
370 #[expect(clippy::indexing_slicing)]
371 if level_size > target_size {
372 scores[idx] = (
373 level_size as f64 / target_size as f64,
374 level_size - target_size,
375 );
376
377 // NOTE: Force a trivial move
378 if version
379 .level(idx + 1)
380 .is_some_and(|next_level| next_level.is_empty())
381 {
382 scores[idx] = (99.99, 999);
383 }
384 }
385 }
386
387 // NOTE: Never score Lmax
388 {
389 scores[6] = (0.0, 0);
390 }
391 }
392
393 // Choose compaction
394 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
395 .into_iter()
396 .enumerate()
397 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
398 score_a
399 .partial_cmp(score_b)
400 .unwrap_or(std::cmp::Ordering::Equal)
401 })
402 .expect("should have highest score somewhere");
403
404 if score < 1.0 {
405 return Choice::DoNothing;
406 }
407
408 // We choose L0->L1 compaction
409 if level_idx_with_highest_score == 0 {
410 let Some(first_level) = version.level(0) else {
411 return Choice::DoNothing;
412 };
413
414 if version.level_is_busy(0, state.hidden_set())
415 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
416 {
417 return Choice::DoNothing;
418 }
419
420 let Some(target_level) = &version.level(canonical_l1_idx) else {
421 return Choice::DoNothing;
422 };
423
424 let mut table_ids = first_level.list_ids();
425
426 let key_range = first_level.aggregate_key_range();
427
428 // Get overlapping tables in next level
429 let target_level_overlapping_table_ids: Vec<_> = target_level
430 .iter()
431 .flat_map(|run| run.get_overlapping(&key_range))
432 .map(Table::id)
433 .collect();
434
435 table_ids.extend(&target_level_overlapping_table_ids);
436
437 let choice = CompactionInput {
438 table_ids,
439 dest_level: canonical_l1_idx as u8,
440 canonical_level: 1,
441 target_size: self.target_size,
442 };
443
444 /* eprintln!(
445 "merge {} tables, L0->L1: {:?}",
446 choice.segment_ids.len(),
447 choice.segment_ids,
448 ); */
449
450 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
451 return Choice::Move(choice);
452 }
453 return Choice::Merge(choice);
454 }
455
456 // We choose L1+ compaction instead
457
458 // NOTE: Level count is 255 max
459 #[expect(clippy::cast_possible_truncation)]
460 let curr_level_index = level_idx_with_highest_score as u8;
461
462 let next_level_index = curr_level_index + 1;
463
464 let Some(level) = version.level(level_idx_with_highest_score) else {
465 return Choice::DoNothing;
466 };
467
468 let Some(next_level) = version.level(next_level_index as usize) else {
469 return Choice::DoNothing;
470 };
471
472 debug_assert!(level.is_disjoint(), "level should be disjoint");
473 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
474
475 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
476 level.first_run().expect("should have exactly one run"),
477 next_level.first_run().map(std::ops::Deref::deref),
478 state.hidden_set(),
479 overshoot_bytes,
480 self.target_size,
481 ) else {
482 return Choice::DoNothing;
483 };
484
485 let choice = CompactionInput {
486 table_ids,
487 dest_level: next_level_index,
488 canonical_level: next_level_index - (level_shift as u8),
489 target_size: self.target_size,
490 };
491
492 /* eprintln!(
493 "{} {} tables, L{}->L{next_level_index}: {:?}",
494 if can_trivial_move { "move" } else { "merge" },
495 choice.segment_ids.len(),
496 next_level_index - 1,
497 choice.segment_ids,
498 ); */
499
500 if can_trivial_move && level.is_disjoint() {
501 return Choice::Move(choice);
502 }
503 Choice::Merge(choice)
504 }
505}
506
507/*
508#[cfg(test)]
509mod tests {
510 use super::{Choice, Strategy};
511 use crate::{
512 cache::Cache,
513 compaction::{CompactionStrategy, Input as CompactionInput},
514 descriptor_table::FileDescriptorTable,
515 level_manifest::LevelManifest,
516 segment::{
517 block::offset::BlockOffset,
518 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
519 file_offsets::FileOffsets,
520 meta::{Metadata, SegmentId},
521 SegmentInner,
522 },
523 super_segment::Segment,
524 time::unix_timestamp,
525 Config, HashSet, KeyRange,
526 };
527 use std::{
528 path::Path,
529 sync::{atomic::AtomicBool, Arc},
530 };
531 use test_log::test;
532
533 fn string_key_range(a: &str, b: &str) -> KeyRange {
534 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
535 }
536
537 #[allow(
538 clippy::expect_used,
539 clippy::cast_possible_truncation,
540 clippy::cast_sign_loss
541 )]
542 fn fixture_segment(
543 id: SegmentId,
544 key_range: KeyRange,
545 size: u64,
546 tombstone_ratio: f32,
547 ) -> Segment {
548 todo!()
549
550 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
551
552 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
553 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
554
555 SegmentInner {
556 tree_id: 0,
557 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
558 block_index,
559
560 offsets: FileOffsets {
561 bloom_ptr: BlockOffset(0),
562 range_filter_ptr: BlockOffset(0),
563 index_block_ptr: BlockOffset(0),
564 metadata_ptr: BlockOffset(0),
565 range_tombstones_ptr: BlockOffset(0),
566 tli_ptr: BlockOffset(0),
567 pfx_ptr: BlockOffset(0),
568 },
569
570 metadata: Metadata {
571 data_block_count: 0,
572 index_block_count: 0,
573 data_block_size: 4_096,
574 index_block_size: 4_096,
575 created_at: unix_timestamp().as_nanos(),
576 id,
577 file_size: size,
578 compression: crate::segment::meta::CompressionType::None,
579 table_type: crate::segment::meta::TableType::Block,
580 item_count: 1_000_000,
581 key_count: 0,
582 key_range,
583 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
584 range_tombstone_count: 0,
585 uncompressed_size: 0,
586 seqnos: (0, 0),
587 },
588 cache,
589
590 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
591
592 path: "a".into(),
593 is_deleted: AtomicBool::default(),
594 }
595 .into() */
596 }
597
598 #[allow(clippy::expect_used)]
599 fn build_levels(
600 path: &Path,
601 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
602 ) -> crate::Result<LevelManifest> {
603 let mut levels = LevelManifest::create_new(
604 recipe.len().try_into().expect("oopsie"),
605 path.join("levels"),
606 )?;
607
608 for (idx, level) in recipe.into_iter().enumerate() {
609 for (id, min, max, size_mib) in level {
610 levels.insert_into_level(
611 idx.try_into().expect("oopsie"),
612 fixture_segment(
613 id,
614 string_key_range(min, max),
615 size_mib * 1_024 * 1_024,
616 0.0,
617 ),
618 );
619 }
620 }
621
622 Ok(levels)
623 }
624
625 #[test]
626 fn leveled_empty_levels() -> crate::Result<()> {
627 let tempdir = tempfile::tempdir()?;
628 let compactor = Strategy::default();
629
630 #[rustfmt::skip]
631 let levels = build_levels(tempdir.path(), vec![
632 vec![],
633 vec![],
634 vec![],
635 vec![],
636 ])?;
637
638 assert_eq!(
639 compactor.choose(&levels, &Config::default()),
640 Choice::DoNothing
641 );
642
643 Ok(())
644 }
645
646 #[test]
647 fn leveled_default_l0() -> crate::Result<()> {
648 let tempdir = tempfile::tempdir()?;
649 let compactor = Strategy {
650 target_size: 64 * 1_024 * 1_024,
651 ..Default::default()
652 };
653
654 #[rustfmt::skip]
655 let mut levels = build_levels(tempdir.path(), vec![
656 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
657 vec![],
658 vec![],
659 vec![],
660 ])?;
661
662 assert_eq!(
663 compactor.choose(&levels, &Config::default()),
664 Choice::Merge(CompactionInput {
665 dest_level: 1,
666 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
667 target_size: 64 * 1_024 * 1_024
668 })
669 );
670
671 levels.hide_segments(std::iter::once(4));
672
673 assert_eq!(
674 compactor.choose(&levels, &Config::default()),
675 Choice::DoNothing
676 );
677
678 Ok(())
679 }
680
681 #[test]
682 #[allow(
683 clippy::cast_sign_loss,
684 clippy::cast_precision_loss,
685 clippy::cast_possible_truncation
686 )]
687 fn leveled_intra_l0() -> crate::Result<()> {
688 let tempdir = tempfile::tempdir()?;
689 let compactor = Strategy {
690 target_size: 64 * 1_024 * 1_024,
691 ..Default::default()
692 };
693
694 #[rustfmt::skip]
695 let mut levels = build_levels(tempdir.path(), vec![
696 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
697 vec![],
698 vec![],
699 vec![],
700 ])?;
701
702 assert_eq!(
703 compactor.choose(&levels, &Config::default()),
704 Choice::Merge(CompactionInput {
705 dest_level: 0,
706 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
707 target_size: u64::from(compactor.target_size),
708 })
709 );
710
711 levels.hide_segments(std::iter::once(4));
712
713 assert_eq!(
714 compactor.choose(&levels, &Config::default()),
715 Choice::DoNothing
716 );
717
718 Ok(())
719 }
720
721 #[test]
722 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
723 let tempdir = tempfile::tempdir()?;
724 let compactor = Strategy {
725 target_size: 64 * 1_024 * 1_024,
726 ..Default::default()
727 };
728
729 #[rustfmt::skip]
730 let levels = build_levels(tempdir.path(), vec![
731 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
732 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
733 vec![],
734 vec![],
735 ])?;
736
737 assert_eq!(
738 compactor.choose(&levels, &Config::default()),
739 Choice::Merge(CompactionInput {
740 dest_level: 1,
741 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
742 target_size: 64 * 1_024 * 1_024
743 })
744 );
745
746 Ok(())
747 }
748
749 #[test]
750 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
751 let tempdir = tempfile::tempdir()?;
752 let compactor = Strategy {
753 target_size: 64 * 1_024 * 1_024,
754 ..Default::default()
755 };
756
757 #[rustfmt::skip]
758 let mut levels = build_levels(tempdir.path(), vec![
759 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
760 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
761 vec![],
762 vec![],
763 ])?;
764
765 assert_eq!(
766 compactor.choose(&levels, &Config::default()),
767 Choice::Merge(CompactionInput {
768 dest_level: 1,
769 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
770 target_size: 64 * 1_024 * 1_024
771 })
772 );
773
774 levels.hide_segments(std::iter::once(5));
775 assert_eq!(
776 compactor.choose(&levels, &Config::default()),
777 Choice::DoNothing
778 );
779
780 Ok(())
781 }
782
783 #[test]
784 fn levelled_from_tiered() -> crate::Result<()> {
785 let tempdir = tempfile::tempdir()?;
786 let compactor = Strategy {
787 target_size: 64 * 1_024 * 1_024,
788 ..Default::default()
789 };
790 let config = Config::default();
791
792 #[rustfmt::skip]
793 let levels = build_levels(tempdir.path(), vec![
794 vec![],
795 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
796 vec![(4, "a", "g", 64)],
797 vec![],
798 ])?;
799
800 assert_eq!(
801 compactor.choose(&levels, &config),
802 Choice::Merge(CompactionInput {
803 dest_level: 2,
804 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
805 target_size: 64 * 1_024 * 1_024
806 })
807 );
808
809 Ok(())
810 }
811}
812 */