lsm_tree/compaction/leveled.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 compaction::state::{hidden_set::HiddenSet, CompactionState},
8 config::Config,
9 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
10 table::Table,
11 version::{run::Ranged, Run, Version},
12 HashSet, KeyRange, TableId,
13};
14
15pub fn aggregate_run_key_range(tables: &[Table]) -> KeyRange {
16 let lo = tables.first().expect("run should never be empty");
17 let hi = tables.last().expect("run should never be empty");
18 KeyRange::new((lo.key_range().min().clone(), hi.key_range().max().clone()))
19}
20
21/// Tries to find the most optimal compaction set from one level into the other.
22fn pick_minimal_compaction(
23 curr_run: &Run<Table>,
24 next_run: Option<&Run<Table>>,
25 hidden_set: &HiddenSet,
26 overshoot: u64,
27 table_base_size: u64,
28) -> Option<(HashSet<TableId>, bool)> {
29 // NOTE: Find largest trivial move (if it exists)
30 if let Some(window) = curr_run.shrinking_windows().find(|window| {
31 if hidden_set.is_blocked(window.iter().map(Table::id)) {
32 // IMPORTANT: Compaction is blocked because of other
33 // on-going compaction
34 return false;
35 }
36
37 let Some(next_run) = &next_run else {
38 // No run in next level, so we can trivially move
39 return true;
40 };
41
42 let key_range = aggregate_run_key_range(window);
43
44 next_run.get_overlapping(&key_range).is_empty()
45 }) {
46 let ids = window.iter().map(Table::id).collect();
47 return Some((ids, true));
48 }
49
50 // NOTE: Look for merges
51 if let Some(next_run) = &next_run {
52 next_run
53 .growing_windows()
54 .take_while(|window| {
55 // Cap at 50x tables per compaction for now
56 //
57 // At this point, all compactions are too large anyway
58 // so we can escape early
59 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
60 next_level_size <= (50 * table_base_size)
61 })
62 .filter_map(|window| {
63 if hidden_set.is_blocked(window.iter().map(Table::id)) {
64 // IMPORTANT: Compaction is blocked because of other
65 // on-going compaction
66 return None;
67 }
68
69 let key_range = aggregate_run_key_range(window);
70
71 // Pull in all contained tables in current level into compaction
72 let curr_level_pull_in = curr_run.get_contained(&key_range);
73
74 let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
75
76 // if curr_level_size < overshoot {
77 // return None;
78 // }
79
80 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
81 // IMPORTANT: Compaction is blocked because of other
82 // on-going compaction
83 return None;
84 }
85
86 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
87
88 // let compaction_bytes = curr_level_size + next_level_size;
89
90 #[expect(clippy::cast_precision_loss)]
91 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
92
93 Some((window, curr_level_pull_in, write_amp))
94 })
95 // Find the compaction with the smallest write amplification factor
96 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
97 .map(|(window, curr_level_pull_in, _)| {
98 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
99 ids.extend(curr_level_pull_in.iter().map(Table::id));
100 (ids, false)
101 })
102 } else {
103 None
104 }
105}
106
107/// Levelled compaction strategy (LCS)
108///
109/// When a level reaches some threshold size, parts of it are merged into overlapping tables in the next level.
110///
111/// Each level Ln for n >= 2 can have up to `level_base_size * ratio^(n - 1)` tables.
112///
113/// LCS suffers from comparatively high write amplification, but has decent read amplification and great space amplification (~1.1x).
114///
115/// LCS is the recommended compaction strategy to use.
116///
117/// More info here: <https://fjall-rs.github.io/post/lsm-leveling/>
118#[derive(Clone)]
119pub struct Strategy {
120 /// When the number of tables in L0 reaches this threshold,
121 /// they are merged into L1.
122 ///
123 /// Default = 4
124 ///
125 /// Same as `level0_file_num_compaction_trigger` in `RocksDB`.
126 pub l0_threshold: u8,
127
128 /// The target table size as disk (possibly compressed).
129 ///
130 /// Default = 64 MiB
131 ///
132 /// Same as `target_file_size_base` in `RocksDB`.
133 pub target_size: u64,
134
135 /// Size ratio between levels of the LSM tree (a.k.a fanout, growth rate)
136 ///
137 /// This is the exponential growth of the from one.
138 /// level to the next.
139 ///
140 /// Default = 10
141 pub level_ratio_policy: Vec<f32>,
142}
143
144impl Default for Strategy {
145 fn default() -> Self {
146 Self {
147 l0_threshold: 4,
148 target_size:/* 64 Mib */ 64 * 1_024 * 1_024,
149 level_ratio_policy: vec![10.0],
150 }
151 }
152}
153
154impl Strategy {
155 /// Sets the growth ratio between levels.
156 #[must_use]
157 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
158 self.level_ratio_policy = policy;
159 self
160 }
161
162 /// Calculates the size of L1.
163 fn level_base_size(&self) -> u64 {
164 self.target_size * u64::from(self.l0_threshold)
165 }
166
167 /// Calculates the level target size.
168 ///
169 /// L1 = `level_base_size`
170 ///
171 /// L2 = `level_base_size * ratio`
172 ///
173 /// L3 = `level_base_size * ratio * ratio`
174 ///
175 /// ...
176 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
177 assert!(
178 canonical_level_idx >= 1,
179 "level_target_size does not apply to L0",
180 );
181
182 if canonical_level_idx == 1 {
183 // u64::from(self.target_size)
184 self.level_base_size()
185 } else {
186 let mut size = self.level_base_size() as f32;
187
188 // NOTE: Minus 2 because |{L0, L1}|
189 for idx in 0..=(canonical_level_idx - 2) {
190 let ratio = self
191 .level_ratio_policy
192 .get(usize::from(idx))
193 .copied()
194 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
195
196 size *= ratio;
197 }
198
199 size as u64
200 }
201 }
202}
203
204impl CompactionStrategy for Strategy {
205 fn get_name(&self) -> &'static str {
206 "LeveledCompaction"
207 }
208
209 fn get_config(&self) -> Vec<crate::KvPair> {
210 vec![
211 (
212 crate::UserKey::from("leveled_l0_threshold"),
213 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
214 ),
215 (
216 crate::UserKey::from("leveled_target_size"),
217 crate::UserValue::from(self.target_size.to_le_bytes()),
218 ),
219 (
220 crate::UserKey::from("leveled_level_ratio_policy"),
221 crate::UserValue::from({
222 use byteorder::{LittleEndian, WriteBytesExt};
223
224 let mut v = vec![];
225
226 v.write_u8(self.level_ratio_policy.len() as u8)
227 .expect("cannot fail");
228
229 for &f in &self.level_ratio_policy {
230 v.write_f32::<LittleEndian>(f).expect("cannot fail");
231 }
232
233 v
234 }),
235 ),
236 ]
237 }
238
239 #[expect(clippy::too_many_lines)]
240 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
241 assert!(version.level_count() == 7, "should have exactly 7 levels");
242
243 // Find the level that corresponds to L1
244 #[expect(clippy::map_unwrap_or)]
245 let mut canonical_l1_idx = version
246 .iter_levels()
247 .enumerate()
248 .skip(1)
249 .find(|(_, lvl)| !lvl.is_empty())
250 .map(|(idx, _)| idx)
251 .unwrap_or_else(|| version.level_count() - 1);
252
253 // Number of levels we have to shift to get from the actual level idx to the canonical
254 let mut level_shift = canonical_l1_idx - 1;
255
256 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
257 let need_new_l1 = version
258 .iter_levels()
259 .enumerate()
260 .skip(1)
261 .filter(|(_, lvl)| !lvl.is_empty())
262 .all(|(idx, level)| {
263 let level_size = level
264 .iter()
265 .flat_map(|x| x.iter())
266 // NOTE: Take bytes that are already being compacted into account,
267 // otherwise we may be overcompensating
268 .filter(|x| !state.hidden_set().is_hidden(x.id()))
269 .map(Table::file_size)
270 .sum::<u64>();
271
272 let target_size = self.level_target_size((idx - level_shift) as u8);
273
274 level_size > target_size
275 });
276
277 // Move up L1 one level if all current levels are at capacity
278 if need_new_l1 {
279 canonical_l1_idx -= 1;
280 level_shift -= 1;
281 }
282 }
283
284 // Trivial move into L1
285 'trivial: {
286 let first_level = version.l0();
287
288 if first_level.run_count() == 1 {
289 if version.level_is_busy(0, state.hidden_set())
290 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
291 {
292 break 'trivial;
293 }
294
295 let Some(target_level) = &version.level(canonical_l1_idx) else {
296 break 'trivial;
297 };
298
299 if target_level.run_count() != 1 {
300 break 'trivial;
301 }
302
303 let key_range = first_level.aggregate_key_range();
304
305 // Get overlapping tables in next level
306 let get_overlapping = target_level
307 .iter()
308 .flat_map(|run| run.get_overlapping(&key_range))
309 .map(Table::id)
310 .next();
311
312 if get_overlapping.is_none() && first_level.is_disjoint() {
313 return Choice::Move(CompactionInput {
314 table_ids: first_level.list_ids(),
315 dest_level: canonical_l1_idx as u8,
316 canonical_level: 1,
317 target_size: self.target_size,
318 });
319 }
320 }
321 }
322
323 // Scoring
324 let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];
325
326 {
327 // TODO(weak-tombstone-rewrite): incorporate `Table::weak_tombstone_count` and
328 // `Table::weak_tombstone_reclaimable` when computing level scores so rewrite
329 // decisions can prioritize tables that would free the most reclaimable values.
330
331 // Score first level
332
333 let first_level = version.l0();
334
335 // TODO: use run_count instead? but be careful because of version free list GC thingy
336 if first_level.table_count() >= usize::from(self.l0_threshold) {
337 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
338 scores[0] = (ratio, 0);
339 }
340
341 // Score L1+
342 for (idx, level) in version.iter_levels().enumerate().skip(1) {
343 if level.is_empty() {
344 continue;
345 }
346
347 let level_size = level
348 .iter()
349 .flat_map(|x| x.iter())
350 // NOTE: Take bytes that are already being compacted into account,
351 // otherwise we may be overcompensating
352 .filter(|x| !state.hidden_set().is_hidden(x.id()))
353 .map(Table::file_size)
354 .sum::<u64>();
355
356 let target_size = self.level_target_size((idx - level_shift) as u8);
357
358 // NOTE: We check for level length above
359 #[expect(clippy::indexing_slicing)]
360 if level_size > target_size {
361 scores[idx] = (
362 level_size as f64 / target_size as f64,
363 level_size - target_size,
364 );
365
366 // NOTE: Force a trivial move
367 if version
368 .level(idx + 1)
369 .is_some_and(|next_level| next_level.is_empty())
370 {
371 scores[idx] = (99.99, 999);
372 }
373 }
374 }
375
376 // NOTE: Never score Lmax
377 //
378 // NOTE: We check for level length above
379 #[expect(clippy::indexing_slicing)]
380 {
381 scores[6] = (0.0, 0);
382 }
383 }
384
385 // Choose compaction
386 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
387 .into_iter()
388 .enumerate()
389 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
390 score_a
391 .partial_cmp(score_b)
392 .unwrap_or(std::cmp::Ordering::Equal)
393 })
394 .expect("should have highest score somewhere");
395
396 if score < 1.0 {
397 return Choice::DoNothing;
398 }
399
400 // We choose L0->L1 compaction
401 if level_idx_with_highest_score == 0 {
402 let Some(first_level) = version.level(0) else {
403 return Choice::DoNothing;
404 };
405
406 if version.level_is_busy(0, state.hidden_set())
407 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
408 {
409 return Choice::DoNothing;
410 }
411
412 let Some(target_level) = &version.level(canonical_l1_idx) else {
413 return Choice::DoNothing;
414 };
415
416 let mut table_ids = first_level.list_ids();
417
418 let key_range = first_level.aggregate_key_range();
419
420 // Get overlapping tables in next level
421 let target_level_overlapping_table_ids: Vec<_> = target_level
422 .iter()
423 .flat_map(|run| run.get_overlapping(&key_range))
424 .map(Table::id)
425 .collect();
426
427 table_ids.extend(&target_level_overlapping_table_ids);
428
429 let choice = CompactionInput {
430 table_ids,
431 dest_level: canonical_l1_idx as u8,
432 canonical_level: 1,
433 target_size: self.target_size,
434 };
435
436 /* eprintln!(
437 "merge {} tables, L0->L1: {:?}",
438 choice.segment_ids.len(),
439 choice.segment_ids,
440 ); */
441
442 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
443 return Choice::Move(choice);
444 }
445 return Choice::Merge(choice);
446 }
447
448 // We choose L1+ compaction instead
449
450 // NOTE: Level count is 255 max
451 #[expect(clippy::cast_possible_truncation)]
452 let curr_level_index = level_idx_with_highest_score as u8;
453
454 let next_level_index = curr_level_index + 1;
455
456 let Some(level) = version.level(level_idx_with_highest_score) else {
457 return Choice::DoNothing;
458 };
459
460 let Some(next_level) = version.level(next_level_index as usize) else {
461 return Choice::DoNothing;
462 };
463
464 debug_assert!(level.is_disjoint(), "level should be disjoint");
465 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
466
467 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
468 level.first_run().expect("should have exactly one run"),
469 next_level.first_run().map(std::ops::Deref::deref),
470 state.hidden_set(),
471 overshoot_bytes,
472 self.target_size,
473 ) else {
474 return Choice::DoNothing;
475 };
476
477 let choice = CompactionInput {
478 table_ids,
479 dest_level: next_level_index,
480 canonical_level: next_level_index - (level_shift as u8),
481 target_size: self.target_size,
482 };
483
484 /* eprintln!(
485 "{} {} tables, L{}->L{next_level_index}: {:?}",
486 if can_trivial_move { "move" } else { "merge" },
487 choice.segment_ids.len(),
488 next_level_index - 1,
489 choice.segment_ids,
490 ); */
491
492 if can_trivial_move && level.is_disjoint() {
493 return Choice::Move(choice);
494 }
495 Choice::Merge(choice)
496 }
497}
498
499/*
500#[cfg(test)]
501mod tests {
502 use super::{Choice, Strategy};
503 use crate::{
504 cache::Cache,
505 compaction::{CompactionStrategy, Input as CompactionInput},
506 descriptor_table::FileDescriptorTable,
507 level_manifest::LevelManifest,
508 segment::{
509 block::offset::BlockOffset,
510 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
511 file_offsets::FileOffsets,
512 meta::{Metadata, SegmentId},
513 SegmentInner,
514 },
515 super_segment::Segment,
516 time::unix_timestamp,
517 Config, HashSet, KeyRange,
518 };
519 use std::{
520 path::Path,
521 sync::{atomic::AtomicBool, Arc},
522 };
523 use test_log::test;
524
525 fn string_key_range(a: &str, b: &str) -> KeyRange {
526 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
527 }
528
529 #[allow(
530 clippy::expect_used,
531 clippy::cast_possible_truncation,
532 clippy::cast_sign_loss
533 )]
534 fn fixture_segment(
535 id: SegmentId,
536 key_range: KeyRange,
537 size: u64,
538 tombstone_ratio: f32,
539 ) -> Segment {
540 todo!()
541
542 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
543
544 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
545 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
546
547 SegmentInner {
548 tree_id: 0,
549 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
550 block_index,
551
552 offsets: FileOffsets {
553 bloom_ptr: BlockOffset(0),
554 range_filter_ptr: BlockOffset(0),
555 index_block_ptr: BlockOffset(0),
556 metadata_ptr: BlockOffset(0),
557 range_tombstones_ptr: BlockOffset(0),
558 tli_ptr: BlockOffset(0),
559 pfx_ptr: BlockOffset(0),
560 },
561
562 metadata: Metadata {
563 data_block_count: 0,
564 index_block_count: 0,
565 data_block_size: 4_096,
566 index_block_size: 4_096,
567 created_at: unix_timestamp().as_nanos(),
568 id,
569 file_size: size,
570 compression: crate::segment::meta::CompressionType::None,
571 table_type: crate::segment::meta::TableType::Block,
572 item_count: 1_000_000,
573 key_count: 0,
574 key_range,
575 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
576 range_tombstone_count: 0,
577 uncompressed_size: 0,
578 seqnos: (0, 0),
579 },
580 cache,
581
582 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
583
584 path: "a".into(),
585 is_deleted: AtomicBool::default(),
586 }
587 .into() */
588 }
589
590 #[allow(clippy::expect_used)]
591 fn build_levels(
592 path: &Path,
593 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
594 ) -> crate::Result<LevelManifest> {
595 let mut levels = LevelManifest::create_new(
596 recipe.len().try_into().expect("oopsie"),
597 path.join("levels"),
598 )?;
599
600 for (idx, level) in recipe.into_iter().enumerate() {
601 for (id, min, max, size_mib) in level {
602 levels.insert_into_level(
603 idx.try_into().expect("oopsie"),
604 fixture_segment(
605 id,
606 string_key_range(min, max),
607 size_mib * 1_024 * 1_024,
608 0.0,
609 ),
610 );
611 }
612 }
613
614 Ok(levels)
615 }
616
617 #[test]
618 fn leveled_empty_levels() -> crate::Result<()> {
619 let tempdir = tempfile::tempdir()?;
620 let compactor = Strategy::default();
621
622 #[rustfmt::skip]
623 let levels = build_levels(tempdir.path(), vec![
624 vec![],
625 vec![],
626 vec![],
627 vec![],
628 ])?;
629
630 assert_eq!(
631 compactor.choose(&levels, &Config::default()),
632 Choice::DoNothing
633 );
634
635 Ok(())
636 }
637
638 #[test]
639 fn leveled_default_l0() -> crate::Result<()> {
640 let tempdir = tempfile::tempdir()?;
641 let compactor = Strategy {
642 target_size: 64 * 1_024 * 1_024,
643 ..Default::default()
644 };
645
646 #[rustfmt::skip]
647 let mut levels = build_levels(tempdir.path(), vec![
648 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
649 vec![],
650 vec![],
651 vec![],
652 ])?;
653
654 assert_eq!(
655 compactor.choose(&levels, &Config::default()),
656 Choice::Merge(CompactionInput {
657 dest_level: 1,
658 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
659 target_size: 64 * 1_024 * 1_024
660 })
661 );
662
663 levels.hide_segments(std::iter::once(4));
664
665 assert_eq!(
666 compactor.choose(&levels, &Config::default()),
667 Choice::DoNothing
668 );
669
670 Ok(())
671 }
672
673 #[test]
674 #[allow(
675 clippy::cast_sign_loss,
676 clippy::cast_precision_loss,
677 clippy::cast_possible_truncation
678 )]
679 fn leveled_intra_l0() -> crate::Result<()> {
680 let tempdir = tempfile::tempdir()?;
681 let compactor = Strategy {
682 target_size: 64 * 1_024 * 1_024,
683 ..Default::default()
684 };
685
686 #[rustfmt::skip]
687 let mut levels = build_levels(tempdir.path(), vec![
688 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
689 vec![],
690 vec![],
691 vec![],
692 ])?;
693
694 assert_eq!(
695 compactor.choose(&levels, &Config::default()),
696 Choice::Merge(CompactionInput {
697 dest_level: 0,
698 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
699 target_size: u64::from(compactor.target_size),
700 })
701 );
702
703 levels.hide_segments(std::iter::once(4));
704
705 assert_eq!(
706 compactor.choose(&levels, &Config::default()),
707 Choice::DoNothing
708 );
709
710 Ok(())
711 }
712
713 #[test]
714 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
715 let tempdir = tempfile::tempdir()?;
716 let compactor = Strategy {
717 target_size: 64 * 1_024 * 1_024,
718 ..Default::default()
719 };
720
721 #[rustfmt::skip]
722 let levels = build_levels(tempdir.path(), vec![
723 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
724 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
725 vec![],
726 vec![],
727 ])?;
728
729 assert_eq!(
730 compactor.choose(&levels, &Config::default()),
731 Choice::Merge(CompactionInput {
732 dest_level: 1,
733 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
734 target_size: 64 * 1_024 * 1_024
735 })
736 );
737
738 Ok(())
739 }
740
741 #[test]
742 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
743 let tempdir = tempfile::tempdir()?;
744 let compactor = Strategy {
745 target_size: 64 * 1_024 * 1_024,
746 ..Default::default()
747 };
748
749 #[rustfmt::skip]
750 let mut levels = build_levels(tempdir.path(), vec![
751 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
752 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
753 vec![],
754 vec![],
755 ])?;
756
757 assert_eq!(
758 compactor.choose(&levels, &Config::default()),
759 Choice::Merge(CompactionInput {
760 dest_level: 1,
761 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
762 target_size: 64 * 1_024 * 1_024
763 })
764 );
765
766 levels.hide_segments(std::iter::once(5));
767 assert_eq!(
768 compactor.choose(&levels, &Config::default()),
769 Choice::DoNothing
770 );
771
772 Ok(())
773 }
774
775 #[test]
776 fn levelled_from_tiered() -> crate::Result<()> {
777 let tempdir = tempfile::tempdir()?;
778 let compactor = Strategy {
779 target_size: 64 * 1_024 * 1_024,
780 ..Default::default()
781 };
782 let config = Config::default();
783
784 #[rustfmt::skip]
785 let levels = build_levels(tempdir.path(), vec![
786 vec![],
787 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
788 vec![(4, "a", "g", 64)],
789 vec![],
790 ])?;
791
792 assert_eq!(
793 compactor.choose(&levels, &config),
794 Choice::Merge(CompactionInput {
795 dest_level: 2,
796 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
797 target_size: 64 * 1_024 * 1_024
798 })
799 );
800
801 Ok(())
802 }
803}
804 */