1#[cfg(test)]
6mod test;
7
8use super::{Choice, CompactionStrategy, Input as CompactionInput};
9use crate::{
10 compaction::state::{hidden_set::HiddenSet, CompactionState},
11 config::Config,
12 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
13 table::{util::aggregate_run_key_range, Table},
14 version::{run::Ranged, Level, Version},
15 HashSet, TableId,
16};
17
18fn pick_minimal_compaction(
23 curr_level: &Level,
24 next_level: &Level,
25 hidden_set: &HiddenSet,
26 _overshoot: u64,
27 table_base_size: u64,
28 cmp: &dyn crate::comparator::UserComparator,
29) -> Option<(HashSet<TableId>, bool)> {
30 for curr_run in curr_level.iter() {
34 if let Some(window) = curr_run.shrinking_windows().find(|window| {
35 if hidden_set.is_blocked(window.iter().map(Table::id)) {
36 return false;
37 }
38
39 if next_level.is_empty() {
40 return true;
41 }
42
43 let key_range = aggregate_run_key_range(window);
44
45 next_level
47 .iter()
48 .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
49 }) {
50 let ids = window.iter().map(Table::id).collect();
51 return Some((ids, true));
52 }
53 }
54
55 if next_level.is_empty() {
59 return None;
60 }
61
62 next_level
63 .iter()
64 .flat_map(|run| {
65 run.growing_windows().take_while(|window| {
69 let size = window.iter().map(Table::file_size).sum::<u64>();
70 size <= (50 * table_base_size)
71 })
72 })
73 .filter_map(|window| {
74 if hidden_set.is_blocked(window.iter().map(Table::id)) {
75 return None;
76 }
77
78 let key_range = aggregate_run_key_range(window);
79
80 let curr_level_pull_in: Vec<&Table> = curr_level
82 .iter()
83 .flat_map(|run| run.get_contained_cmp(&key_range, cmp))
84 .collect();
85
86 let curr_level_size = curr_level_pull_in
87 .iter()
88 .map(|t| Table::file_size(t))
89 .sum::<u64>();
90
91 if curr_level_size == 0 {
92 return None;
93 }
94
95 if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
96 return None;
97 }
98
99 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
100 let compaction_bytes = curr_level_size + next_level_size;
101
102 Some((window, curr_level_pull_in, compaction_bytes))
103 })
104 .min_by_key(|(_, _, bytes)| *bytes)
105 .map(|(window, curr_level_pull_in, _)| {
106 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
107 ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
108 (ids, false)
109 })
110}
111
112#[doc(hidden)]
113pub const NAME: &str = "LeveledCompaction";
114
115#[derive(Clone)]
127pub struct Strategy {
128 l0_threshold: u8,
129
130 target_size: u64,
132
133 level_ratio_policy: Vec<f32>,
135
136 dynamic: bool,
143
144 multi_level: bool,
150}
151
152impl Default for Strategy {
153 fn default() -> Self {
154 Self {
155 l0_threshold: 4,
156 target_size:64 * 1_024 * 1_024,
157 level_ratio_policy: vec![10.0],
158 dynamic: false,
159 multi_level: false,
160 }
161 }
162}
163
164impl Strategy {
165 #[must_use]
171 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
172 self.level_ratio_policy = policy;
173 self
174 }
175
176 #[must_use]
185 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
186 self.l0_threshold = threshold;
187 self
188 }
189
190 #[must_use]
196 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
197 self.target_size = bytes;
198 self
199 }
200
201 #[must_use]
212 pub fn with_dynamic_level_bytes(mut self, enabled: bool) -> Self {
213 self.dynamic = enabled;
214 self
215 }
216
217 #[must_use]
226 pub fn with_multi_level(mut self, enabled: bool) -> Self {
227 self.multi_level = enabled;
228 self
229 }
230
231 fn level_base_size(&self) -> u64 {
233 self.target_size * u64::from(self.l0_threshold)
234 }
235
236 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
246 assert!(
247 canonical_level_idx >= 1,
248 "level_target_size does not apply to L0",
249 );
250
251 if canonical_level_idx == 1 {
252 self.level_base_size()
254 } else {
255 #[expect(
256 clippy::cast_precision_loss,
257 reason = "precision loss is acceptable for level size calculations"
258 )]
259 let mut size = self.level_base_size() as f32;
260
261 for idx in 0..=(canonical_level_idx - 2) {
263 let ratio = self
264 .level_ratio_policy
265 .get(usize::from(idx))
266 .copied()
267 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
268
269 size *= ratio;
270 }
271
272 #[expect(
273 clippy::cast_possible_truncation,
274 clippy::cast_sign_loss,
275 reason = "size is always positive and will never even come close to u64::MAX"
276 )]
277 {
278 size as u64
279 }
280 }
281 }
282
283 fn compute_level_targets(
291 &self,
292 version: &Version,
293 level_shift: usize,
294 state: &CompactionState,
295 ) -> [u64; 7] {
296 let mut targets = [u64::MAX; 7];
297
298 targets[0] = u64::MAX;
300
301 if self.dynamic {
302 let mut lmax_idx = None;
306
307 for (idx, lvl) in version.iter_levels().enumerate().skip(1) {
308 if !lvl.is_empty() {
309 lmax_idx = Some(idx);
310 }
311 }
312
313 if let Some(lmax_idx) = lmax_idx {
314 #[expect(
315 clippy::expect_used,
316 reason = "lmax_idx was found by iterating levels, so it must exist"
317 )]
318 let lmax_level = version.level(lmax_idx).expect("level should exist");
319
320 let lmax_size: u64 = lmax_level
321 .iter()
322 .flat_map(|run| run.iter())
323 .filter(|table| !state.hidden_set().is_hidden(table.id()))
324 .map(Table::file_size)
325 .sum();
326
327 if let Some(slot) = targets.get_mut(lmax_idx) {
329 *slot = lmax_size;
330 }
331
332 #[expect(
333 clippy::cast_precision_loss,
334 reason = "precision loss is acceptable for level size calculations"
335 )]
336 let mut current_target = lmax_size as f64;
337
338 let dynamic_l1_idx = level_shift + 1;
342
343 for idx in (dynamic_l1_idx..lmax_idx).rev() {
344 let canonical = idx - level_shift;
345 let ratio_idx = canonical.saturating_sub(1);
348 let ratio = f64::from(
349 self.level_ratio_policy
350 .get(ratio_idx)
351 .copied()
352 .unwrap_or_else(|| {
353 self.level_ratio_policy.last().copied().unwrap_or(10.0)
354 }),
355 );
356
357 if !ratio.is_finite() || ratio <= 0.0 {
361 return self.compute_static_targets(level_shift);
362 }
363
364 current_target /= ratio;
365
366 #[expect(
367 clippy::cast_possible_truncation,
368 clippy::cast_sign_loss,
369 reason = "target is always positive"
370 )]
371 if let Some(slot) = targets.get_mut(idx) {
372 *slot = current_target as u64;
373 }
374 }
375
376 let static_l1 = self.level_base_size();
379 if targets.get(dynamic_l1_idx).copied().unwrap_or(0) < static_l1 {
380 return self.compute_static_targets(level_shift);
381 }
382
383 return targets;
384 }
385 }
386
387 self.compute_static_targets(level_shift)
388 }
389
390 fn compute_static_targets(&self, level_shift: usize) -> [u64; 7] {
392 let mut targets = [u64::MAX; 7];
393
394 for (idx, slot) in targets.iter_mut().enumerate().skip(1) {
395 if idx <= level_shift {
396 continue; }
398 #[expect(
399 clippy::cast_possible_truncation,
400 reason = "level index is bounded by level count (7)"
401 )]
402 {
403 *slot = self.level_target_size((idx - level_shift) as u8);
404 }
405 }
406
407 targets
408 }
409}
410
411impl CompactionStrategy for Strategy {
412 fn get_name(&self) -> &'static str {
413 NAME
414 }
415
416 fn get_config(&self) -> Vec<crate::KvPair> {
417 vec![
418 (
419 crate::UserKey::from("leveled_l0_threshold"),
420 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
421 ),
422 (
423 crate::UserKey::from("leveled_target_size"),
424 crate::UserValue::from(self.target_size.to_le_bytes()),
425 ),
426 (
427 crate::UserKey::from("leveled_level_ratio_policy"),
428 crate::UserValue::from({
429 use byteorder::{LittleEndian, WriteBytesExt};
430
431 let mut v = vec![];
432
433 #[expect(
434 clippy::expect_used,
435 clippy::cast_possible_truncation,
436 reason = "writing into Vec should not fail; policies have length of 255 max"
437 )]
438 v.write_u8(self.level_ratio_policy.len() as u8)
439 .expect("cannot fail");
440
441 for &f in &self.level_ratio_policy {
442 #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
443 v.write_f32::<LittleEndian>(f).expect("cannot fail");
444 }
445
446 v
447 }),
448 ),
449 (
450 crate::UserKey::from("leveled_dynamic"),
451 crate::UserValue::from([u8::from(self.dynamic)]),
452 ),
453 (
454 crate::UserKey::from("leveled_multi_level"),
455 crate::UserValue::from([u8::from(self.multi_level)]),
456 ),
457 ]
458 }
459
460 #[expect(clippy::too_many_lines)]
461 fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
462 assert!(version.level_count() == 7, "should have exactly 7 levels");
463 let cmp = config.comparator.as_ref();
464
465 'trivial_lmax: {
467 #[expect(
468 clippy::expect_used,
469 reason = "level 0 is guaranteed to exist in a valid version"
470 )]
471 let l0 = version.level(0).expect("first level should exist");
472
473 if !l0.is_empty() && l0.is_disjoint() {
474 let lmax_index = version.level_count() - 1;
475
476 if (1..lmax_index).any(|idx| {
477 #[expect(
478 clippy::expect_used,
479 reason = "levels within level_count are guaranteed to exist"
480 )]
481 let level = version.level(idx).expect("level should exist");
482 !level.is_empty()
483 }) {
484 break 'trivial_lmax;
486 }
487
488 #[expect(
489 clippy::expect_used,
490 reason = "lmax_index is derived from level_count so level is guaranteed to exist"
491 )]
492 let lmax = version.level(lmax_index).expect("last level should exist");
493
494 if !lmax
495 .aggregate_key_range_cmp(cmp)
496 .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
497 {
498 return Choice::Move(CompactionInput {
499 table_ids: l0.list_ids(),
500 #[expect(
501 clippy::cast_possible_truncation,
502 reason = "level count is at most 7, fits in u8"
503 )]
504 dest_level: lmax_index as u8,
505 canonical_level: 1,
506 target_size: self.target_size,
507 });
508 }
509 }
510 }
511
512 #[expect(clippy::map_unwrap_or)]
514 let first_non_empty_level = version
515 .iter_levels()
516 .enumerate()
517 .skip(1)
518 .find(|(_, lvl)| !lvl.is_empty())
519 .map(|(idx, _)| idx)
520 .unwrap_or_else(|| version.level_count() - 1);
521
522 let mut canonical_l1_idx = first_non_empty_level;
523
524 let mut level_shift = canonical_l1_idx - 1;
526
527 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
528 let need_new_l1 = version
529 .iter_levels()
530 .enumerate()
531 .skip(1)
532 .filter(|(_, lvl)| !lvl.is_empty())
533 .all(|(idx, level)| {
534 let level_size = level
535 .iter()
536 .flat_map(|x| x.iter())
537 .filter(|x| !state.hidden_set().is_hidden(x.id()))
540 .map(Table::file_size)
541 .sum::<u64>();
542
543 #[expect(
544 clippy::cast_possible_truncation,
545 reason = "level index is bounded by level count (7, technically 255)"
546 )]
547 let target_size = self.level_target_size((idx - level_shift) as u8);
548
549 level_size > target_size
550 });
551
552 if need_new_l1 {
554 canonical_l1_idx -= 1;
555 level_shift -= 1;
556 }
557 }
558
559 'trivial: {
561 let first_level = version.l0();
562 let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
563
564 if first_level.run_count() == 1 {
565 if version.level_is_busy(0, state.hidden_set())
566 || version.level_is_busy(target_level_idx, state.hidden_set())
567 {
568 break 'trivial;
569 }
570
571 let Some(target_level) = &version.level(target_level_idx) else {
572 break 'trivial;
573 };
574
575 if target_level.run_count() != 1 {
576 break 'trivial;
577 }
578
579 let key_range = first_level.aggregate_key_range_cmp(cmp);
580
581 let get_overlapping = target_level
583 .iter()
584 .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
585 .map(Table::id)
586 .next();
587
588 if get_overlapping.is_none() && first_level.is_disjoint() {
589 #[expect(
590 clippy::cast_possible_truncation,
591 reason = "level index is bounded by level count (7)"
592 )]
593 return Choice::Move(CompactionInput {
594 table_ids: first_level.list_ids(),
595 dest_level: target_level_idx as u8,
596 canonical_level: 1,
597 target_size: self.target_size,
598 });
599 }
600 }
601 }
602
603 {
606 let first_level = version.l0();
607
608 if first_level.run_count() > 1
609 && first_level.table_count() < usize::from(self.l0_threshold)
610 && !version.level_is_busy(0, state.hidden_set())
611 {
612 return Choice::Merge(CompactionInput {
613 table_ids: first_level.list_ids(),
614 dest_level: 0,
615 canonical_level: 0,
616 target_size: self.target_size,
617 });
618 }
619 }
620
621 let level_targets = self.compute_level_targets(version, level_shift, state);
623
624 let mut scores = [(0.0, 0u64); 7];
626
627 {
628 let first_level = version.l0();
634
635 if first_level.table_count() >= usize::from(self.l0_threshold) {
636 #[expect(
637 clippy::cast_precision_loss,
638 reason = "precision loss is acceptable for scoring calculations"
639 )]
640 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
641 scores[0] = (ratio, 0);
642 }
643
644 for (idx, level) in version.iter_levels().enumerate().skip(1) {
646 if level.is_empty() {
647 continue;
648 }
649
650 let level_size = level
651 .iter()
652 .flat_map(|x| x.iter())
653 .filter(|x| !state.hidden_set().is_hidden(x.id()))
656 .map(Table::file_size)
657 .sum::<u64>();
658
659 #[expect(clippy::indexing_slicing)]
661 let target_size = level_targets[idx];
662
663 #[expect(
664 clippy::indexing_slicing,
665 reason = "idx is from iter_levels().enumerate() so always < 7 = scores.len()"
666 )]
667 if level_size > target_size {
668 #[expect(
669 clippy::cast_precision_loss,
670 reason = "precision loss is acceptable for scoring calculations"
671 )]
672 let score = level_size as f64 / target_size as f64;
673 scores[idx] = (score, level_size - target_size);
674
675 if version
677 .level(idx + 1)
678 .is_some_and(|next_level| next_level.is_empty())
679 {
680 scores[idx] = (99.99, 999);
681 }
682 }
683 }
684
685 {
687 scores[6] = (0.0, 0);
688 }
689 }
690
691 #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
693 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
694 .into_iter()
695 .enumerate()
696 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
697 score_a
698 .partial_cmp(score_b)
699 .unwrap_or(std::cmp::Ordering::Equal)
700 })
701 .expect("should have highest score somewhere");
702
703 if score < 1.0 {
704 return Choice::DoNothing;
705 }
706
707 if level_idx_with_highest_score == 0 {
709 let Some(first_level) = version.level(0) else {
710 return Choice::DoNothing;
711 };
712
713 if version.level_is_busy(0, state.hidden_set())
714 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
715 {
716 return Choice::DoNothing;
717 }
718
719 let Some(target_level) = &version.level(canonical_l1_idx) else {
720 return Choice::DoNothing;
721 };
722
723 let mut table_ids = first_level.list_ids();
724
725 let key_range = first_level.aggregate_key_range_cmp(cmp);
726
727 let target_level_overlapping_table_ids: Vec<_> = target_level
729 .iter()
730 .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
731 .map(Table::id)
732 .collect();
733
734 table_ids.extend(&target_level_overlapping_table_ids);
735
736 if self.multi_level {
743 let l1_score = scores.get(canonical_l1_idx).map_or(0.0, |(s, _)| *s);
744 let l2_idx = canonical_l1_idx + 1;
745
746 if l1_score > 1.0
747 && l2_idx < version.level_count()
748 && !version.level_is_busy(l2_idx, state.hidden_set())
749 {
750 if let Some(l2) = version.level(l2_idx) {
751 table_ids.extend(target_level.list_ids());
753
754 {
764 let mut input_ranges: Vec<_> = target_level
765 .iter()
766 .chain(first_level.iter())
767 .flat_map(|run| run.iter())
768 .map(|t| t.key_range().clone())
769 .collect();
770 input_ranges.sort_by(|a, b| cmp.compare(a.min(), b.min()));
771
772 let merged = crate::KeyRange::merge_sorted_cmp(input_ranges, cmp);
773
774 for run in l2.iter() {
775 for interval in &merged {
776 for l2t in run.get_overlapping_cmp(interval, cmp) {
777 table_ids.insert(Table::id(l2t));
778 }
779 }
780 }
781 }
782
783 #[expect(
784 clippy::cast_possible_truncation,
785 reason = "level index is bounded by level count (7)"
786 )]
787 return Choice::Merge(CompactionInput {
788 table_ids,
789 dest_level: l2_idx as u8,
790 canonical_level: 2,
791 target_size: self.target_size,
792 });
793 }
794 }
795 }
796
797 #[expect(
798 clippy::cast_possible_truncation,
799 reason = "level index is bounded by level count (7, technically 255)"
800 )]
801 let choice = CompactionInput {
802 table_ids,
803 dest_level: canonical_l1_idx as u8,
804 canonical_level: 1,
805 target_size: self.target_size,
806 };
807
808 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
809 return Choice::Move(choice);
810 }
811 return Choice::Merge(choice);
812 }
813
814 #[expect(clippy::cast_possible_truncation)]
818 let curr_level_index = level_idx_with_highest_score as u8;
819
820 let next_level_index = curr_level_index + 1;
821
822 let Some(level) = version.level(level_idx_with_highest_score) else {
823 return Choice::DoNothing;
824 };
825
826 let Some(next_level) = version.level(next_level_index as usize) else {
827 return Choice::DoNothing;
828 };
829
830 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
833 level,
834 next_level,
835 state.hidden_set(),
836 overshoot_bytes,
837 self.target_size,
838 cmp,
839 ) else {
840 return Choice::DoNothing;
841 };
842
843 #[expect(
844 clippy::cast_possible_truncation,
845 reason = "level shift is bounded by level count (7, technically 255)"
846 )]
847 let choice = CompactionInput {
848 table_ids,
849 dest_level: next_level_index,
850 canonical_level: next_level_index - (level_shift as u8),
851 target_size: self.target_size,
852 };
853
854 if can_trivial_move && level.is_disjoint() {
855 return Choice::Move(choice);
856 }
857 Choice::Merge(choice)
858 }
859}