1#[cfg(test)]
6#[allow(
7 clippy::unwrap_used,
8 clippy::indexing_slicing,
9 clippy::useless_vec,
10 clippy::unnecessary_map_or,
11 reason = "test code"
12)]
13mod test;
14
15use super::{Choice, CompactionStrategy, Input as CompactionInput};
16use crate::{
17 HashSet, TableId,
18 compaction::state::{CompactionState, hidden_set::HiddenSet},
19 config::Config,
20 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
21 table::{Table, util::aggregate_run_key_range},
22 version::{Level, Version, run::Ranged},
23};
24#[cfg(not(feature = "std"))]
25use alloc::vec::Vec;
26
27fn pick_minimal_compaction(
32 curr_level: &Level,
33 next_level: &Level,
34 hidden_set: &HiddenSet,
35 _overshoot: u64,
36 table_base_size: u64,
37 cmp: &dyn crate::comparator::UserComparator,
38) -> Option<(HashSet<TableId>, bool)> {
39 for curr_run in curr_level.iter() {
43 if let Some(window) = curr_run.shrinking_windows().find(|window| {
44 if hidden_set.is_blocked(window.iter().map(Table::id)) {
45 return false;
46 }
47
48 if next_level.is_empty() {
49 return true;
50 }
51
52 let key_range = aggregate_run_key_range(window);
53
54 next_level
56 .iter()
57 .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
58 }) {
59 let ids = window.iter().map(Table::id).collect();
60 return Some((ids, true));
61 }
62 }
63
64 if next_level.is_empty() {
68 return None;
69 }
70
71 next_level
72 .iter()
73 .flat_map(|run| {
74 run.growing_windows().take_while(|window| {
78 let size = window.iter().map(Table::file_size).sum::<u64>();
79 size <= (50 * table_base_size)
80 })
81 })
82 .filter_map(|window| {
83 if hidden_set.is_blocked(window.iter().map(Table::id)) {
84 return None;
85 }
86
87 let key_range = aggregate_run_key_range(window);
88
89 let curr_level_pull_in: Vec<&Table> = curr_level
91 .iter()
92 .flat_map(|run| run.get_contained_cmp(&key_range, cmp))
93 .collect();
94
95 let curr_level_size = curr_level_pull_in
96 .iter()
97 .map(|t| Table::file_size(t))
98 .sum::<u64>();
99
100 if curr_level_size == 0 {
101 return None;
102 }
103
104 if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
105 return None;
106 }
107
108 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
109 let compaction_bytes = curr_level_size + next_level_size;
110
111 Some((window, curr_level_pull_in, compaction_bytes))
112 })
113 .min_by_key(|(_, _, bytes)| *bytes)
114 .map(|(window, curr_level_pull_in, _)| {
115 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
116 ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
117 (ids, false)
118 })
119}
120
121#[doc(hidden)]
122pub const NAME: &str = "LeveledCompaction";
123
124#[derive(Clone)]
136pub struct Strategy {
137 l0_threshold: u8,
138
139 target_size: u64,
141
142 level_ratio_policy: Vec<f32>,
144
145 dynamic: bool,
152
153 multi_level: bool,
159}
160
161impl Default for Strategy {
162 fn default() -> Self {
163 Self {
164 l0_threshold: 4,
165 target_size:64 * 1_024 * 1_024,
166 level_ratio_policy: vec![10.0],
167 dynamic: false,
168 multi_level: false,
169 }
170 }
171}
172
173impl Strategy {
174 #[must_use]
180 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
181 self.level_ratio_policy = policy;
182 self
183 }
184
185 #[must_use]
194 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
195 self.l0_threshold = threshold;
196 self
197 }
198
199 #[must_use]
205 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
206 self.target_size = bytes;
207 self
208 }
209
210 #[must_use]
221 pub fn with_dynamic_level_bytes(mut self, enabled: bool) -> Self {
222 self.dynamic = enabled;
223 self
224 }
225
226 #[must_use]
235 pub fn with_multi_level(mut self, enabled: bool) -> Self {
236 self.multi_level = enabled;
237 self
238 }
239
240 fn level_base_size(&self) -> u64 {
242 self.target_size * u64::from(self.l0_threshold)
243 }
244
245 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
255 assert!(
256 canonical_level_idx >= 1,
257 "level_target_size does not apply to L0",
258 );
259
260 if canonical_level_idx == 1 {
261 self.level_base_size()
263 } else {
264 #[expect(
265 clippy::cast_precision_loss,
266 reason = "precision loss is acceptable for level size calculations"
267 )]
268 let mut size = self.level_base_size() as f32;
269
270 for idx in 0..=(canonical_level_idx - 2) {
272 let ratio = self
273 .level_ratio_policy
274 .get(usize::from(idx))
275 .copied()
276 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
277
278 size *= ratio;
279 }
280
281 #[expect(
282 clippy::cast_possible_truncation,
283 clippy::cast_sign_loss,
284 reason = "size is always positive and will never even come close to u64::MAX"
285 )]
286 {
287 size as u64
288 }
289 }
290 }
291
292 fn compute_level_targets(
300 &self,
301 version: &Version,
302 level_shift: usize,
303 state: &CompactionState,
304 ) -> [u64; 7] {
305 let mut targets = [u64::MAX; 7];
306
307 targets[0] = u64::MAX;
309
310 if self.dynamic {
311 let mut lmax_idx = None;
315
316 for (idx, lvl) in version.iter_levels().enumerate().skip(1) {
317 if !lvl.is_empty() {
318 lmax_idx = Some(idx);
319 }
320 }
321
322 if let Some(lmax_idx) = lmax_idx {
323 #[expect(
324 clippy::expect_used,
325 reason = "lmax_idx was found by iterating levels, so it must exist"
326 )]
327 let lmax_level = version.level(lmax_idx).expect("level should exist");
328
329 let lmax_size: u64 = lmax_level
330 .iter()
331 .flat_map(|run| run.iter())
332 .filter(|table| !state.hidden_set().is_hidden(table.id()))
333 .map(Table::file_size)
334 .sum();
335
336 if let Some(slot) = targets.get_mut(lmax_idx) {
338 *slot = lmax_size;
339 }
340
341 #[expect(
342 clippy::cast_precision_loss,
343 reason = "precision loss is acceptable for level size calculations"
344 )]
345 let mut current_target = lmax_size as f64;
346
347 let dynamic_l1_idx = level_shift + 1;
351
352 for idx in (dynamic_l1_idx..lmax_idx).rev() {
353 let canonical = idx - level_shift;
354 let ratio_idx = canonical.saturating_sub(1);
359 let ratio = f64::from(
360 self.level_ratio_policy
361 .get(ratio_idx)
362 .copied()
363 .unwrap_or_else(|| {
364 self.level_ratio_policy.last().copied().unwrap_or(10.0)
365 }),
366 );
367
368 if !ratio.is_finite() || ratio <= 0.0 {
372 return self.compute_static_targets(level_shift);
373 }
374
375 current_target /= ratio;
376
377 #[expect(
378 clippy::cast_possible_truncation,
379 clippy::cast_sign_loss,
380 reason = "target is always positive"
381 )]
382 if let Some(slot) = targets.get_mut(idx) {
383 *slot = current_target as u64;
384 }
385 }
386
387 let static_l1 = self.level_base_size();
390 if targets.get(dynamic_l1_idx).copied().unwrap_or(0) < static_l1 {
391 return self.compute_static_targets(level_shift);
392 }
393
394 return targets;
395 }
396 }
397
398 self.compute_static_targets(level_shift)
399 }
400
401 fn compute_static_targets(&self, level_shift: usize) -> [u64; 7] {
403 let mut targets = [u64::MAX; 7];
404
405 for (idx, slot) in targets.iter_mut().enumerate().skip(1) {
406 if idx <= level_shift {
407 continue; }
409 #[expect(
410 clippy::cast_possible_truncation,
411 reason = "level index is bounded by level count (7)"
412 )]
413 {
414 *slot = self.level_target_size((idx - level_shift) as u8);
415 }
416 }
417
418 targets
419 }
420}
421
422impl CompactionStrategy for Strategy {
423 fn get_name(&self) -> &'static str {
424 NAME
425 }
426
427 fn get_config(&self) -> Vec<crate::KvPair> {
428 vec![
429 (
430 crate::UserKey::from("leveled_l0_threshold"),
431 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
432 ),
433 (
434 crate::UserKey::from("leveled_target_size"),
435 crate::UserValue::from(self.target_size.to_le_bytes()),
436 ),
437 (
438 crate::UserKey::from("leveled_level_ratio_policy"),
439 crate::UserValue::from({
440 use crate::io::{LittleEndian, WriteBytesExt};
441
442 let mut v = vec![];
443
444 #[expect(
445 clippy::expect_used,
446 clippy::cast_possible_truncation,
447 reason = "writing into Vec should not fail; policies have length of 255 max"
448 )]
449 v.write_u8(self.level_ratio_policy.len() as u8)
450 .expect("cannot fail");
451
452 for &f in &self.level_ratio_policy {
453 #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
454 v.write_f32::<LittleEndian>(f).expect("cannot fail");
455 }
456
457 v
458 }),
459 ),
460 (
461 crate::UserKey::from("leveled_dynamic"),
462 crate::UserValue::from([u8::from(self.dynamic)]),
463 ),
464 (
465 crate::UserKey::from("leveled_multi_level"),
466 crate::UserValue::from([u8::from(self.multi_level)]),
467 ),
468 ]
469 }
470
471 fn pending_compaction_bytes(&self, version: &Version) -> u64 {
472 let targets = self.compute_level_targets(version, 0, &CompactionState::default());
475 let mut debt = 0u64;
476 for (idx, level) in version.iter_levels().enumerate() {
477 if idx == 0 {
481 if level.table_count() >= usize::from(self.l0_threshold) {
486 debt += level.size();
487 }
488 } else if let Some(&target) = targets.get(idx) {
489 debt += level.size().saturating_sub(target);
492 }
493 }
494 debt
495 }
496
497 #[expect(clippy::too_many_lines)]
498 fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
499 assert!(
506 version.level_count() == usize::from(config.level_count),
507 "leveled compaction requires version.level_count() == config.level_count (got version={}, config={})",
508 version.level_count(),
509 config.level_count,
510 );
511 let cmp = config.comparator.as_ref();
512
513 'trivial_lmax: {
515 #[expect(
516 clippy::expect_used,
517 reason = "level 0 is guaranteed to exist in a valid version"
518 )]
519 let l0 = version.level(0).expect("first level should exist");
520
521 if !l0.is_empty() && l0.is_disjoint() {
522 let lmax_index = version.level_count() - 1;
523
524 if (1..lmax_index).any(|idx| {
525 #[expect(
526 clippy::expect_used,
527 reason = "levels within level_count are guaranteed to exist"
528 )]
529 let level = version.level(idx).expect("level should exist");
530 !level.is_empty()
531 }) {
532 break 'trivial_lmax;
534 }
535
536 #[expect(
537 clippy::expect_used,
538 reason = "lmax_index is derived from level_count so level is guaranteed to exist"
539 )]
540 let lmax = version.level(lmax_index).expect("last level should exist");
541
542 if !lmax
543 .aggregate_key_range_cmp(cmp)
544 .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
545 {
546 return Choice::Move(CompactionInput {
547 table_ids: l0.list_ids(),
548 #[expect(
549 clippy::cast_possible_truncation,
550 reason = "level count is at most 7, fits in u8"
551 )]
552 dest_level: lmax_index as u8,
553 canonical_level: 1,
554 target_size: self.target_size,
555 });
556 }
557 }
558 }
559
560 #[expect(clippy::map_unwrap_or)]
562 let first_non_empty_level = version
563 .iter_levels()
564 .enumerate()
565 .skip(1)
566 .find(|(_, lvl)| !lvl.is_empty())
567 .map(|(idx, _)| idx)
568 .unwrap_or_else(|| version.level_count() - 1);
569
570 let mut canonical_l1_idx = first_non_empty_level;
571
572 let mut level_shift = canonical_l1_idx - 1;
574
575 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
576 let need_new_l1 = version
577 .iter_levels()
578 .enumerate()
579 .skip(1)
580 .filter(|(_, lvl)| !lvl.is_empty())
581 .all(|(idx, level)| {
582 let level_size = level
583 .iter()
584 .flat_map(|x| x.iter())
585 .filter(|x| !state.hidden_set().is_hidden(x.id()))
588 .map(Table::file_size)
589 .sum::<u64>();
590
591 #[expect(
592 clippy::cast_possible_truncation,
593 reason = "level index is bounded by level count (7, technically 255)"
594 )]
595 let target_size = self.level_target_size((idx - level_shift) as u8);
596
597 level_size > target_size
598 });
599
600 if need_new_l1 {
602 canonical_l1_idx -= 1;
603 level_shift -= 1;
604 }
605 }
606
607 'trivial: {
609 let first_level = version.l0();
610 let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
611
612 if first_level.run_count() == 1 {
613 if version.level_is_busy(0, state.hidden_set())
614 || version.level_is_busy(target_level_idx, state.hidden_set())
615 {
616 break 'trivial;
617 }
618
619 let Some(target_level) = &version.level(target_level_idx) else {
620 break 'trivial;
621 };
622
623 if target_level.run_count() != 1 {
624 break 'trivial;
625 }
626
627 let key_range = first_level.aggregate_key_range_cmp(cmp);
628
629 let get_overlapping = target_level
631 .iter()
632 .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
633 .map(Table::id)
634 .next();
635
636 if get_overlapping.is_none() && first_level.is_disjoint() {
637 #[expect(
638 clippy::cast_possible_truncation,
639 reason = "level index is bounded by level count (7)"
640 )]
641 return Choice::Move(CompactionInput {
642 table_ids: first_level.list_ids(),
643 dest_level: target_level_idx as u8,
644 canonical_level: 1,
645 target_size: self.target_size,
646 });
647 }
648 }
649 }
650
651 {
654 let first_level = version.l0();
655
656 if first_level.run_count() > 1
657 && first_level.table_count() < usize::from(self.l0_threshold)
658 && !version.level_is_busy(0, state.hidden_set())
659 {
660 return Choice::Merge(CompactionInput {
661 table_ids: first_level.list_ids(),
662 dest_level: 0,
663 canonical_level: 0,
664 target_size: self.target_size,
665 });
666 }
667 }
668
669 let level_targets = self.compute_level_targets(version, level_shift, state);
671
672 let mut scores = [(0.0, 0u64); 7];
674
675 {
676 let first_level = version.l0();
682
683 if first_level.table_count() >= usize::from(self.l0_threshold) {
684 #[expect(
685 clippy::cast_precision_loss,
686 reason = "precision loss is acceptable for scoring calculations"
687 )]
688 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
689 scores[0] = (ratio, 0);
690 }
691
692 for (idx, level) in version.iter_levels().enumerate().skip(1) {
694 if level.is_empty() {
695 continue;
696 }
697
698 let level_size = level
699 .iter()
700 .flat_map(|x| x.iter())
701 .filter(|x| !state.hidden_set().is_hidden(x.id()))
704 .map(Table::file_size)
705 .sum::<u64>();
706
707 #[expect(clippy::indexing_slicing)]
709 let target_size = level_targets[idx];
710
711 #[expect(
712 clippy::indexing_slicing,
713 reason = "idx is from iter_levels().enumerate() so always < 7 = scores.len()"
714 )]
715 if level_size > target_size {
716 #[expect(
717 clippy::cast_precision_loss,
718 reason = "precision loss is acceptable for scoring calculations"
719 )]
720 let score = level_size as f64 / target_size as f64;
721 scores[idx] = (score, level_size - target_size);
722
723 if version
725 .level(idx + 1)
726 .is_some_and(|next_level| next_level.is_empty())
727 {
728 scores[idx] = (99.99, 999);
729 }
730 }
731 }
732
733 {
735 scores[6] = (0.0, 0);
736 }
737 }
738
739 #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
741 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
742 .into_iter()
743 .enumerate()
744 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
745 score_a
746 .partial_cmp(score_b)
747 .unwrap_or(core::cmp::Ordering::Equal)
748 })
749 .expect("should have highest score somewhere");
750
751 if score < 1.0 {
752 return Choice::DoNothing;
753 }
754
755 if level_idx_with_highest_score == 0 {
757 let Some(first_level) = version.level(0) else {
758 return Choice::DoNothing;
759 };
760
761 if version.level_is_busy(0, state.hidden_set())
762 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
763 {
764 return Choice::DoNothing;
765 }
766
767 let Some(target_level) = &version.level(canonical_l1_idx) else {
768 return Choice::DoNothing;
769 };
770
771 let mut table_ids = first_level.list_ids();
772
773 let key_range = first_level.aggregate_key_range_cmp(cmp);
774
775 let target_level_overlapping_table_ids: Vec<_> = target_level
777 .iter()
778 .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
779 .map(Table::id)
780 .collect();
781
782 table_ids.extend(&target_level_overlapping_table_ids);
783
784 if self.multi_level {
791 let l1_score = scores.get(canonical_l1_idx).map_or(0.0, |(s, _)| *s);
792 let l2_idx = canonical_l1_idx + 1;
793
794 if l1_score > 1.0
795 && l2_idx < version.level_count()
796 && !version.level_is_busy(l2_idx, state.hidden_set())
797 && let Some(l2) = version.level(l2_idx)
798 {
799 table_ids.extend(target_level.list_ids());
801
802 {
812 let mut input_ranges: Vec<_> = target_level
813 .iter()
814 .chain(first_level.iter())
815 .flat_map(|run| run.iter())
816 .map(|t| t.key_range().clone())
817 .collect();
818 input_ranges.sort_by(|a, b| cmp.compare(a.min(), b.min()));
819
820 let merged = crate::KeyRange::merge_sorted_cmp(input_ranges, cmp);
821
822 for run in l2.iter() {
823 for interval in &merged {
824 for l2t in run.get_overlapping_cmp(interval, cmp) {
825 table_ids.insert(Table::id(l2t));
826 }
827 }
828 }
829 }
830
831 #[expect(
832 clippy::cast_possible_truncation,
833 reason = "level index is bounded by level count (7)"
834 )]
835 return Choice::Merge(CompactionInput {
836 table_ids,
837 dest_level: l2_idx as u8,
838 canonical_level: 2,
839 target_size: self.target_size,
840 });
841 }
842 }
843
844 #[expect(
845 clippy::cast_possible_truncation,
846 reason = "level index is bounded by level count (7, technically 255)"
847 )]
848 let choice = CompactionInput {
849 table_ids,
850 dest_level: canonical_l1_idx as u8,
851 canonical_level: 1,
852 target_size: self.target_size,
853 };
854
855 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
856 return Choice::Move(choice);
857 }
858 return Choice::Merge(choice);
859 }
860
861 #[expect(clippy::cast_possible_truncation)]
865 let curr_level_index = level_idx_with_highest_score as u8;
866
867 let next_level_index = curr_level_index + 1;
868
869 let Some(level) = version.level(level_idx_with_highest_score) else {
870 return Choice::DoNothing;
871 };
872
873 let Some(next_level) = version.level(next_level_index as usize) else {
874 return Choice::DoNothing;
875 };
876
877 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
880 level,
881 next_level,
882 state.hidden_set(),
883 overshoot_bytes,
884 self.target_size,
885 cmp,
886 ) else {
887 return Choice::DoNothing;
888 };
889
890 #[expect(
891 clippy::cast_possible_truncation,
892 reason = "level shift is bounded by level count (7, technically 255)"
893 )]
894 let choice = CompactionInput {
895 table_ids,
896 dest_level: next_level_index,
897 canonical_level: next_level_index - (level_shift as u8),
898 target_size: self.target_size,
899 };
900
901 if can_trivial_move && level.is_disjoint() {
902 return Choice::Move(choice);
903 }
904 Choice::Merge(choice)
905 }
906}