lsm_tree/compaction/leveled.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 compaction::state::{hidden_set::HiddenSet, CompactionState},
8 config::Config,
9 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
10 table::{util::aggregate_run_key_range, Table},
11 version::{Run, Version},
12 HashSet, TableId,
13};
14
15#[doc(hidden)]
16pub const NAME: &str = "LeveledCompaction";
17
18/// Tries to find the most optimal compaction set from one level into the other.
19fn pick_minimal_compaction(
20 curr_run: &Run<Table>,
21 next_run: Option<&Run<Table>>,
22 hidden_set: &HiddenSet,
23 _overshoot: u64,
24 table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26 // NOTE: Find largest trivial move (if it exists)
27 if let Some(window) = curr_run.shrinking_windows().find(|window| {
28 if hidden_set.is_blocked(window.iter().map(Table::id)) {
29 // IMPORTANT: Compaction is blocked because of other
30 // on-going compaction
31 return false;
32 }
33
34 let Some(next_run) = &next_run else {
35 // No run in next level, so we can trivially move
36 return true;
37 };
38
39 let key_range = aggregate_run_key_range(window);
40
41 next_run.get_overlapping(&key_range).is_empty()
42 }) {
43 let ids = window.iter().map(Table::id).collect();
44 return Some((ids, true));
45 }
46
47 // NOTE: Look for merges
48 if let Some(next_run) = &next_run {
49 next_run
50 .growing_windows()
51 .take_while(|window| {
52 // Cap at 50x tables per compaction for now
53 //
54 // At this point, all compactions are too large anyway
55 // so we can escape early
56 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57 next_level_size <= (50 * table_base_size)
58 })
59 .filter_map(|window| {
60 if hidden_set.is_blocked(window.iter().map(Table::id)) {
61 // IMPORTANT: Compaction is blocked because of other
62 // on-going compaction
63 return None;
64 }
65
66 let key_range = aggregate_run_key_range(window);
67
68 // Pull in all contained tables in current level into compaction
69 let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71 let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73 // if curr_level_size < overshoot {
74 // return None;
75 // }
76
77 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
78 // IMPORTANT: Compaction is blocked because of other
79 // on-going compaction
80 return None;
81 }
82
83 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
84
85 // let compaction_bytes = curr_level_size + next_level_size;
86
87 #[expect(clippy::cast_precision_loss)]
88 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
89
90 Some((window, curr_level_pull_in, write_amp))
91 })
92 // Find the compaction with the smallest write amplification factor
93 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
94 .map(|(window, curr_level_pull_in, _)| {
95 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
96 ids.extend(curr_level_pull_in.iter().map(Table::id));
97 (ids, false)
98 })
99 } else {
100 None
101 }
102}
103
104/// Levelled compaction strategy (LCS)
105///
106/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
107///
108/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
109///
110/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
111///
112/// LCS is the recommended compaction strategy to use.
113///
114/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
115#[derive(Clone)]
116pub struct Strategy {
117 /// When the number of tables in L0 reaches this threshold,
118 /// they are merged into L1.
119 ///
120 /// Default = 4
121 ///
122 /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
123 pub l0_threshold: u8,
124
125 /// The target table size as disk (possibly compressed).
126 ///
127 /// Default = 64 MiB
128 ///
129 /// Same as `target_file_size_base` in `RocksDB`.
130 pub target_size: u64,
131
132 /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
133 ///
134 /// This is the exponential growth of the from one.
135 /// level to the next.
136 ///
137 /// Default = 10
138 pub level_ratio_policy: Vec<f32>,
139}
140
141impl Default for Strategy {
142 fn default() -> Self {
143 Self {
144 l0_threshold: 4,
145 target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
146 level_ratio_policy: vec![10.0],
147 }
148 }
149}
150
151impl Strategy {
152 /// Sets the growth ratio between levels.
153 #[must_use]
154 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
155 self.level_ratio_policy = policy;
156 self
157 }
158
159 /// Calculates the size of L1.
160 fn level_base_size(&self) -> u64 {
161 self.target_size * u64::from(self.l0_threshold)
162 }
163
164 /// Calculates the level target size.
165 ///
166 /// L1 = `level_base_size`
167 ///
168 /// L2 = `level_base_size * ratio`
169 ///
170 /// L3 = `level_base_size * ratio * ratio`
171 ///
172 /// ...
173 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
174 assert!(
175 canonical_level_idx >= 1,
176 "level_target_size does not apply to L0",
177 );
178
179 if canonical_level_idx == 1 {
180 // u64::from(self.target_size)
181 self.level_base_size()
182 } else {
183 let mut size = self.level_base_size() as f32;
184
185 // NOTE: Minus 2 because |{L0, L1}|
186 for idx in 0..=(canonical_level_idx - 2) {
187 let ratio = self
188 .level_ratio_policy
189 .get(usize::from(idx))
190 .copied()
191 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
192
193 size *= ratio;
194 }
195
196 size as u64
197 }
198 }
199}
200
201impl CompactionStrategy for Strategy {
202 fn get_name(&self) -> &'static str {
203 NAME
204 }
205
206 fn get_config(&self) -> Vec<crate::KvPair> {
207 vec![
208 (
209 crate::UserKey::from("leveled_l0_threshold"),
210 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
211 ),
212 (
213 crate::UserKey::from("leveled_target_size"),
214 crate::UserValue::from(self.target_size.to_le_bytes()),
215 ),
216 (
217 crate::UserKey::from("leveled_level_ratio_policy"),
218 crate::UserValue::from({
219 use byteorder::{LittleEndian, WriteBytesExt};
220
221 let mut v = vec![];
222
223 v.write_u8(self.level_ratio_policy.len() as u8)
224 .expect("cannot fail");
225
226 for &f in &self.level_ratio_policy {
227 v.write_f32::<LittleEndian>(f).expect("cannot fail");
228 }
229
230 v
231 }),
232 ),
233 ]
234 }
235
236 #[expect(clippy::too_many_lines)]
237 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
238 assert!(version.level_count() == 7, "should have exactly 7 levels");
239
240 // Find the level that corresponds to L1
241 #[expect(clippy::map_unwrap_or)]
242 let mut canonical_l1_idx = version
243 .iter_levels()
244 .enumerate()
245 .skip(1)
246 .find(|(_, lvl)| !lvl.is_empty())
247 .map(|(idx, _)| idx)
248 .unwrap_or_else(|| version.level_count() - 1);
249
250 // Number of levels we have to shift to get from the actual level idx to the canonical
251 let mut level_shift = canonical_l1_idx - 1;
252
253 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
254 let need_new_l1 = version
255 .iter_levels()
256 .enumerate()
257 .skip(1)
258 .filter(|(_, lvl)| !lvl.is_empty())
259 .all(|(idx, level)| {
260 let level_size = level
261 .iter()
262 .flat_map(|x| x.iter())
263 // NOTE: Take bytes that are already being compacted into account,
264 // otherwise we may be overcompensating
265 .filter(|x| !state.hidden_set().is_hidden(x.id()))
266 .map(Table::file_size)
267 .sum::<u64>();
268
269 let target_size = self.level_target_size((idx - level_shift) as u8);
270
271 level_size > target_size
272 });
273
274 // Move up L1 one level if all current levels are at capacity
275 if need_new_l1 {
276 canonical_l1_idx -= 1;
277 level_shift -= 1;
278 }
279 }
280
281 // Trivial move into L1
282 'trivial: {
283 let first_level = version.l0();
284
285 if first_level.run_count() == 1 {
286 if version.level_is_busy(0, state.hidden_set())
287 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
288 {
289 break 'trivial;
290 }
291
292 let Some(target_level) = &version.level(canonical_l1_idx) else {
293 break 'trivial;
294 };
295
296 if target_level.run_count() != 1 {
297 break 'trivial;
298 }
299
300 let key_range = first_level.aggregate_key_range();
301
302 // Get overlapping tables in next level
303 let get_overlapping = target_level
304 .iter()
305 .flat_map(|run| run.get_overlapping(&key_range))
306 .map(Table::id)
307 .next();
308
309 if get_overlapping.is_none() && first_level.is_disjoint() {
310 return Choice::Move(CompactionInput {
311 table_ids: first_level.list_ids(),
312 dest_level: canonical_l1_idx as u8,
313 canonical_level: 1,
314 target_size: self.target_size,
315 });
316 }
317 }
318 }
319
320 // Scoring
321 let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
322
323 {
324 // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
325 // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
326 // decisions can prioritize tables that would free the most reclaimable values.
327
328 // Score first level
329
330 let first_level = version.l0();
331
332 // TODO: use run_count instead? but be careful because of version free list GC thingy
333 if first_level.table_count() >= usize::from(self.l0_threshold) {
334 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
335 scores[0] = (ratio, 0);
336 }
337
338 // Score L1+
339 for (idx, level) in version.iter_levels().enumerate().skip(1) {
340 if level.is_empty() {
341 continue;
342 }
343
344 let level_size = level
345 .iter()
346 .flat_map(|x| x.iter())
347 // NOTE: Take bytes that are already being compacted into account,
348 // otherwise we may be overcompensating
349 .filter(|x| !state.hidden_set().is_hidden(x.id()))
350 .map(Table::file_size)
351 .sum::<u64>();
352
353 let target_size = self.level_target_size((idx - level_shift) as u8);
354
355 // NOTE: We check for level length above
356 #[expect(clippy::indexing_slicing)]
357 if level_size > target_size {
358 scores[idx] = (
359 level_size as f64 / target_size as f64,
360 level_size - target_size,
361 );
362
363 // NOTE: Force a trivial move
364 if version
365 .level(idx + 1)
366 .is_some_and(|next_level| next_level.is_empty())
367 {
368 scores[idx] = (99.99, 999);
369 }
370 }
371 }
372
373 // NOTE: Never score Lmax
374 {
375 scores[6] = (0.0, 0);
376 }
377 }
378
379 // Choose compaction
380 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
381 .into_iter()
382 .enumerate()
383 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
384 score_a
385 .partial_cmp(score_b)
386 .unwrap_or(std::cmp::Ordering::Equal)
387 })
388 .expect("should have highest score somewhere");
389
390 if score < 1.0 {
391 return Choice::DoNothing;
392 }
393
394 // We choose L0->L1 compaction
395 if level_idx_with_highest_score == 0 {
396 let Some(first_level) = version.level(0) else {
397 return Choice::DoNothing;
398 };
399
400 if version.level_is_busy(0, state.hidden_set())
401 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
402 {
403 return Choice::DoNothing;
404 }
405
406 let Some(target_level) = &version.level(canonical_l1_idx) else {
407 return Choice::DoNothing;
408 };
409
410 let mut table_ids = first_level.list_ids();
411
412 let key_range = first_level.aggregate_key_range();
413
414 // Get overlapping tables in next level
415 let target_level_overlapping_table_ids: Vec<_> = target_level
416 .iter()
417 .flat_map(|run| run.get_overlapping(&key_range))
418 .map(Table::id)
419 .collect();
420
421 table_ids.extend(&target_level_overlapping_table_ids);
422
423 let choice = CompactionInput {
424 table_ids,
425 dest_level: canonical_l1_idx as u8,
426 canonical_level: 1,
427 target_size: self.target_size,
428 };
429
430 /* eprintln!(
431 "merge {} tables, L0->L1: {:?}",
432 choice.segment_ids.len(),
433 choice.segment_ids,
434 ); */
435
436 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
437 return Choice::Move(choice);
438 }
439 return Choice::Merge(choice);
440 }
441
442 // We choose L1+ compaction instead
443
444 // NOTE: Level count is 255 max
445 #[expect(clippy::cast_possible_truncation)]
446 let curr_level_index = level_idx_with_highest_score as u8;
447
448 let next_level_index = curr_level_index + 1;
449
450 let Some(level) = version.level(level_idx_with_highest_score) else {
451 return Choice::DoNothing;
452 };
453
454 let Some(next_level) = version.level(next_level_index as usize) else {
455 return Choice::DoNothing;
456 };
457
458 debug_assert!(level.is_disjoint(), "level should be disjoint");
459 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
460
461 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
462 level.first_run().expect("should have exactly one run"),
463 next_level.first_run().map(std::ops::Deref::deref),
464 state.hidden_set(),
465 overshoot_bytes,
466 self.target_size,
467 ) else {
468 return Choice::DoNothing;
469 };
470
471 let choice = CompactionInput {
472 table_ids,
473 dest_level: next_level_index,
474 canonical_level: next_level_index - (level_shift as u8),
475 target_size: self.target_size,
476 };
477
478 /* eprintln!(
479 "{} {} tables, L{}->L{next_level_index}: {:?}",
480 if can_trivial_move { "move" } else { "merge" },
481 choice.segment_ids.len(),
482 next_level_index - 1,
483 choice.segment_ids,
484 ); */
485
486 if can_trivial_move && level.is_disjoint() {
487 return Choice::Move(choice);
488 }
489 Choice::Merge(choice)
490 }
491}
492
493/*
494#[cfg(test)]
495mod tests {
496 use super::{Choice, Strategy};
497 use crate::{
498 cache::Cache,
499 compaction::{CompactionStrategy, Input as CompactionInput},
500 descriptor_table::FileDescriptorTable,
501 level_manifest::LevelManifest,
502 segment::{
503 block::offset::BlockOffset,
504 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
505 file_offsets::FileOffsets,
506 meta::{Metadata, SegmentId},
507 SegmentInner,
508 },
509 super_segment::Segment,
510 time::unix_timestamp,
511 Config, HashSet, KeyRange,
512 };
513 use std::{
514 path::Path,
515 sync::{atomic::AtomicBool, Arc},
516 };
517 use test_log::test;
518
519 fn string_key_range(a: &str, b: &str) -> KeyRange {
520 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
521 }
522
523 #[allow(
524 clippy::expect_used,
525 clippy::cast_possible_truncation,
526 clippy::cast_sign_loss
527 )]
528 fn fixture_segment(
529 id: SegmentId,
530 key_range: KeyRange,
531 size: u64,
532 tombstone_ratio: f32,
533 ) -> Segment {
534 todo!()
535
536 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
537
538 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
539 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
540
541 SegmentInner {
542 tree_id: 0,
543 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
544 block_index,
545
546 offsets: FileOffsets {
547 bloom_ptr: BlockOffset(0),
548 range_filter_ptr: BlockOffset(0),
549 index_block_ptr: BlockOffset(0),
550 metadata_ptr: BlockOffset(0),
551 range_tombstones_ptr: BlockOffset(0),
552 tli_ptr: BlockOffset(0),
553 pfx_ptr: BlockOffset(0),
554 },
555
556 metadata: Metadata {
557 data_block_count: 0,
558 index_block_count: 0,
559 data_block_size: 4_096,
560 index_block_size: 4_096,
561 created_at: unix_timestamp().as_nanos(),
562 id,
563 file_size: size,
564 compression: crate::segment::meta::CompressionType::None,
565 table_type: crate::segment::meta::TableType::Block,
566 item_count: 1_000_000,
567 key_count: 0,
568 key_range,
569 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
570 range_tombstone_count: 0,
571 uncompressed_size: 0,
572 seqnos: (0, 0),
573 },
574 cache,
575
576 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
577
578 path: "a".into(),
579 is_deleted: AtomicBool::default(),
580 }
581 .into() */
582 }
583
584 #[allow(clippy::expect_used)]
585 fn build_levels(
586 path: &Path,
587 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
588 ) -> crate::Result<LevelManifest> {
589 let mut levels = LevelManifest::create_new(
590 recipe.len().try_into().expect("oopsie"),
591 path.join("levels"),
592 )?;
593
594 for (idx, level) in recipe.into_iter().enumerate() {
595 for (id, min, max, size_mib) in level {
596 levels.insert_into_level(
597 idx.try_into().expect("oopsie"),
598 fixture_segment(
599 id,
600 string_key_range(min, max),
601 size_mib * 1_024 * 1_024,
602 0.0,
603 ),
604 );
605 }
606 }
607
608 Ok(levels)
609 }
610
611 #[test]
612 fn leveled_empty_levels() -> crate::Result<()> {
613 let tempdir = tempfile::tempdir()?;
614 let compactor = Strategy::default();
615
616 #[rustfmt::skip]
617 let levels = build_levels(tempdir.path(), vec![
618 vec![],
619 vec![],
620 vec![],
621 vec![],
622 ])?;
623
624 assert_eq!(
625 compactor.choose(&levels, &Config::default()),
626 Choice::DoNothing
627 );
628
629 Ok(())
630 }
631
632 #[test]
633 fn leveled_default_l0() -> crate::Result<()> {
634 let tempdir = tempfile::tempdir()?;
635 let compactor = Strategy {
636 target_size: 64 * 1_024 * 1_024,
637 ..Default::default()
638 };
639
640 #[rustfmt::skip]
641 let mut levels = build_levels(tempdir.path(), vec![
642 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
643 vec![],
644 vec![],
645 vec![],
646 ])?;
647
648 assert_eq!(
649 compactor.choose(&levels, &Config::default()),
650 Choice::Merge(CompactionInput {
651 dest_level: 1,
652 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
653 target_size: 64 * 1_024 * 1_024
654 })
655 );
656
657 levels.hide_segments(std::iter::once(4));
658
659 assert_eq!(
660 compactor.choose(&levels, &Config::default()),
661 Choice::DoNothing
662 );
663
664 Ok(())
665 }
666
667 #[test]
668 #[allow(
669 clippy::cast_sign_loss,
670 clippy::cast_precision_loss,
671 clippy::cast_possible_truncation
672 )]
673 fn leveled_intra_l0() -> crate::Result<()> {
674 let tempdir = tempfile::tempdir()?;
675 let compactor = Strategy {
676 target_size: 64 * 1_024 * 1_024,
677 ..Default::default()
678 };
679
680 #[rustfmt::skip]
681 let mut levels = build_levels(tempdir.path(), vec![
682 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
683 vec![],
684 vec![],
685 vec![],
686 ])?;
687
688 assert_eq!(
689 compactor.choose(&levels, &Config::default()),
690 Choice::Merge(CompactionInput {
691 dest_level: 0,
692 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
693 target_size: u64::from(compactor.target_size),
694 })
695 );
696
697 levels.hide_segments(std::iter::once(4));
698
699 assert_eq!(
700 compactor.choose(&levels, &Config::default()),
701 Choice::DoNothing
702 );
703
704 Ok(())
705 }
706
707 #[test]
708 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
709 let tempdir = tempfile::tempdir()?;
710 let compactor = Strategy {
711 target_size: 64 * 1_024 * 1_024,
712 ..Default::default()
713 };
714
715 #[rustfmt::skip]
716 let levels = build_levels(tempdir.path(), vec![
717 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
718 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
719 vec![],
720 vec![],
721 ])?;
722
723 assert_eq!(
724 compactor.choose(&levels, &Config::default()),
725 Choice::Merge(CompactionInput {
726 dest_level: 1,
727 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
728 target_size: 64 * 1_024 * 1_024
729 })
730 );
731
732 Ok(())
733 }
734
735 #[test]
736 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
737 let tempdir = tempfile::tempdir()?;
738 let compactor = Strategy {
739 target_size: 64 * 1_024 * 1_024,
740 ..Default::default()
741 };
742
743 #[rustfmt::skip]
744 let mut levels = build_levels(tempdir.path(), vec![
745 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
746 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
747 vec![],
748 vec![],
749 ])?;
750
751 assert_eq!(
752 compactor.choose(&levels, &Config::default()),
753 Choice::Merge(CompactionInput {
754 dest_level: 1,
755 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
756 target_size: 64 * 1_024 * 1_024
757 })
758 );
759
760 levels.hide_segments(std::iter::once(5));
761 assert_eq!(
762 compactor.choose(&levels, &Config::default()),
763 Choice::DoNothing
764 );
765
766 Ok(())
767 }
768
769 #[test]
770 fn levelled_from_tiered() -> crate::Result<()> {
771 let tempdir = tempfile::tempdir()?;
772 let compactor = Strategy {
773 target_size: 64 * 1_024 * 1_024,
774 ..Default::default()
775 };
776 let config = Config::default();
777
778 #[rustfmt::skip]
779 let levels = build_levels(tempdir.path(), vec![
780 vec![],
781 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
782 vec![(4, "a", "g", 64)],
783 vec![],
784 ])?;
785
786 assert_eq!(
787 compactor.choose(&levels, &config),
788 Choice::Merge(CompactionInput {
789 dest_level: 2,
790 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
791 target_size: 64 * 1_024 * 1_024
792 })
793 );
794
795 Ok(())
796 }
797}
798 */