1#[cfg(test)]
6#[allow(
7 clippy::unwrap_used,
8 clippy::indexing_slicing,
9 clippy::useless_vec,
10 clippy::unnecessary_map_or,
11 reason = "test code"
12)]
13mod test;
14
15use super::{Choice, CompactionStrategy, Input as CompactionInput};
16use crate::{
17 HashSet, TableId,
18 compaction::state::{CompactionState, hidden_set::HiddenSet},
19 config::Config,
20 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
21 table::{Table, util::aggregate_run_key_range},
22 version::{Level, Version, run::Ranged},
23};
24
25fn pick_minimal_compaction(
30 curr_level: &Level,
31 next_level: &Level,
32 hidden_set: &HiddenSet,
33 _overshoot: u64,
34 table_base_size: u64,
35 cmp: &dyn crate::comparator::UserComparator,
36) -> Option<(HashSet<TableId>, bool)> {
37 for curr_run in curr_level.iter() {
41 if let Some(window) = curr_run.shrinking_windows().find(|window| {
42 if hidden_set.is_blocked(window.iter().map(Table::id)) {
43 return false;
44 }
45
46 if next_level.is_empty() {
47 return true;
48 }
49
50 let key_range = aggregate_run_key_range(window);
51
52 next_level
54 .iter()
55 .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
56 }) {
57 let ids = window.iter().map(Table::id).collect();
58 return Some((ids, true));
59 }
60 }
61
62 if next_level.is_empty() {
66 return None;
67 }
68
69 next_level
70 .iter()
71 .flat_map(|run| {
72 run.growing_windows().take_while(|window| {
76 let size = window.iter().map(Table::file_size).sum::<u64>();
77 size <= (50 * table_base_size)
78 })
79 })
80 .filter_map(|window| {
81 if hidden_set.is_blocked(window.iter().map(Table::id)) {
82 return None;
83 }
84
85 let key_range = aggregate_run_key_range(window);
86
87 let curr_level_pull_in: Vec<&Table> = curr_level
89 .iter()
90 .flat_map(|run| run.get_contained_cmp(&key_range, cmp))
91 .collect();
92
93 let curr_level_size = curr_level_pull_in
94 .iter()
95 .map(|t| Table::file_size(t))
96 .sum::<u64>();
97
98 if curr_level_size == 0 {
99 return None;
100 }
101
102 if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
103 return None;
104 }
105
106 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
107 let compaction_bytes = curr_level_size + next_level_size;
108
109 Some((window, curr_level_pull_in, compaction_bytes))
110 })
111 .min_by_key(|(_, _, bytes)| *bytes)
112 .map(|(window, curr_level_pull_in, _)| {
113 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
114 ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
115 (ids, false)
116 })
117}
118
119#[doc(hidden)]
120pub const NAME: &str = "LeveledCompaction";
121
122#[derive(Clone)]
134pub struct Strategy {
135 l0_threshold: u8,
136
137 target_size: u64,
139
140 level_ratio_policy: Vec<f32>,
142
143 dynamic: bool,
150
151 multi_level: bool,
157}
158
159impl Default for Strategy {
160 fn default() -> Self {
161 Self {
162 l0_threshold: 4,
163 target_size:64 * 1_024 * 1_024,
164 level_ratio_policy: vec![10.0],
165 dynamic: false,
166 multi_level: false,
167 }
168 }
169}
170
171impl Strategy {
172 #[must_use]
178 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
179 self.level_ratio_policy = policy;
180 self
181 }
182
183 #[must_use]
192 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
193 self.l0_threshold = threshold;
194 self
195 }
196
197 #[must_use]
203 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
204 self.target_size = bytes;
205 self
206 }
207
208 #[must_use]
219 pub fn with_dynamic_level_bytes(mut self, enabled: bool) -> Self {
220 self.dynamic = enabled;
221 self
222 }
223
224 #[must_use]
233 pub fn with_multi_level(mut self, enabled: bool) -> Self {
234 self.multi_level = enabled;
235 self
236 }
237
238 fn level_base_size(&self) -> u64 {
240 self.target_size * u64::from(self.l0_threshold)
241 }
242
243 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
253 assert!(
254 canonical_level_idx >= 1,
255 "level_target_size does not apply to L0",
256 );
257
258 if canonical_level_idx == 1 {
259 self.level_base_size()
261 } else {
262 #[expect(
263 clippy::cast_precision_loss,
264 reason = "precision loss is acceptable for level size calculations"
265 )]
266 let mut size = self.level_base_size() as f32;
267
268 for idx in 0..=(canonical_level_idx - 2) {
270 let ratio = self
271 .level_ratio_policy
272 .get(usize::from(idx))
273 .copied()
274 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
275
276 size *= ratio;
277 }
278
279 #[expect(
280 clippy::cast_possible_truncation,
281 clippy::cast_sign_loss,
282 reason = "size is always positive and will never even come close to u64::MAX"
283 )]
284 {
285 size as u64
286 }
287 }
288 }
289
290 fn compute_level_targets(
298 &self,
299 version: &Version,
300 level_shift: usize,
301 state: &CompactionState,
302 ) -> [u64; 7] {
303 let mut targets = [u64::MAX; 7];
304
305 targets[0] = u64::MAX;
307
308 if self.dynamic {
309 let mut lmax_idx = None;
313
314 for (idx, lvl) in version.iter_levels().enumerate().skip(1) {
315 if !lvl.is_empty() {
316 lmax_idx = Some(idx);
317 }
318 }
319
320 if let Some(lmax_idx) = lmax_idx {
321 #[expect(
322 clippy::expect_used,
323 reason = "lmax_idx was found by iterating levels, so it must exist"
324 )]
325 let lmax_level = version.level(lmax_idx).expect("level should exist");
326
327 let lmax_size: u64 = lmax_level
328 .iter()
329 .flat_map(|run| run.iter())
330 .filter(|table| !state.hidden_set().is_hidden(table.id()))
331 .map(Table::file_size)
332 .sum();
333
334 if let Some(slot) = targets.get_mut(lmax_idx) {
336 *slot = lmax_size;
337 }
338
339 #[expect(
340 clippy::cast_precision_loss,
341 reason = "precision loss is acceptable for level size calculations"
342 )]
343 let mut current_target = lmax_size as f64;
344
345 let dynamic_l1_idx = level_shift + 1;
349
350 for idx in (dynamic_l1_idx..lmax_idx).rev() {
351 let canonical = idx - level_shift;
352 let ratio_idx = canonical.saturating_sub(1);
355 let ratio = f64::from(
356 self.level_ratio_policy
357 .get(ratio_idx)
358 .copied()
359 .unwrap_or_else(|| {
360 self.level_ratio_policy.last().copied().unwrap_or(10.0)
361 }),
362 );
363
364 if !ratio.is_finite() || ratio <= 0.0 {
368 return self.compute_static_targets(level_shift);
369 }
370
371 current_target /= ratio;
372
373 #[expect(
374 clippy::cast_possible_truncation,
375 clippy::cast_sign_loss,
376 reason = "target is always positive"
377 )]
378 if let Some(slot) = targets.get_mut(idx) {
379 *slot = current_target as u64;
380 }
381 }
382
383 let static_l1 = self.level_base_size();
386 if targets.get(dynamic_l1_idx).copied().unwrap_or(0) < static_l1 {
387 return self.compute_static_targets(level_shift);
388 }
389
390 return targets;
391 }
392 }
393
394 self.compute_static_targets(level_shift)
395 }
396
397 fn compute_static_targets(&self, level_shift: usize) -> [u64; 7] {
399 let mut targets = [u64::MAX; 7];
400
401 for (idx, slot) in targets.iter_mut().enumerate().skip(1) {
402 if idx <= level_shift {
403 continue; }
405 #[expect(
406 clippy::cast_possible_truncation,
407 reason = "level index is bounded by level count (7)"
408 )]
409 {
410 *slot = self.level_target_size((idx - level_shift) as u8);
411 }
412 }
413
414 targets
415 }
416}
417
418impl CompactionStrategy for Strategy {
419 fn get_name(&self) -> &'static str {
420 NAME
421 }
422
423 fn get_config(&self) -> Vec<crate::KvPair> {
424 vec![
425 (
426 crate::UserKey::from("leveled_l0_threshold"),
427 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
428 ),
429 (
430 crate::UserKey::from("leveled_target_size"),
431 crate::UserValue::from(self.target_size.to_le_bytes()),
432 ),
433 (
434 crate::UserKey::from("leveled_level_ratio_policy"),
435 crate::UserValue::from({
436 use byteorder::{LittleEndian, WriteBytesExt};
437
438 let mut v = vec![];
439
440 #[expect(
441 clippy::expect_used,
442 clippy::cast_possible_truncation,
443 reason = "writing into Vec should not fail; policies have length of 255 max"
444 )]
445 v.write_u8(self.level_ratio_policy.len() as u8)
446 .expect("cannot fail");
447
448 for &f in &self.level_ratio_policy {
449 #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
450 v.write_f32::<LittleEndian>(f).expect("cannot fail");
451 }
452
453 v
454 }),
455 ),
456 (
457 crate::UserKey::from("leveled_dynamic"),
458 crate::UserValue::from([u8::from(self.dynamic)]),
459 ),
460 (
461 crate::UserKey::from("leveled_multi_level"),
462 crate::UserValue::from([u8::from(self.multi_level)]),
463 ),
464 ]
465 }
466
467 #[expect(clippy::too_many_lines)]
468 fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
469 assert!(version.level_count() == 7, "should have exactly 7 levels");
470 let cmp = config.comparator.as_ref();
471
472 'trivial_lmax: {
474 #[expect(
475 clippy::expect_used,
476 reason = "level 0 is guaranteed to exist in a valid version"
477 )]
478 let l0 = version.level(0).expect("first level should exist");
479
480 if !l0.is_empty() && l0.is_disjoint() {
481 let lmax_index = version.level_count() - 1;
482
483 if (1..lmax_index).any(|idx| {
484 #[expect(
485 clippy::expect_used,
486 reason = "levels within level_count are guaranteed to exist"
487 )]
488 let level = version.level(idx).expect("level should exist");
489 !level.is_empty()
490 }) {
491 break 'trivial_lmax;
493 }
494
495 #[expect(
496 clippy::expect_used,
497 reason = "lmax_index is derived from level_count so level is guaranteed to exist"
498 )]
499 let lmax = version.level(lmax_index).expect("last level should exist");
500
501 if !lmax
502 .aggregate_key_range_cmp(cmp)
503 .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
504 {
505 return Choice::Move(CompactionInput {
506 table_ids: l0.list_ids(),
507 #[expect(
508 clippy::cast_possible_truncation,
509 reason = "level count is at most 7, fits in u8"
510 )]
511 dest_level: lmax_index as u8,
512 canonical_level: 1,
513 target_size: self.target_size,
514 });
515 }
516 }
517 }
518
519 #[expect(clippy::map_unwrap_or)]
521 let first_non_empty_level = version
522 .iter_levels()
523 .enumerate()
524 .skip(1)
525 .find(|(_, lvl)| !lvl.is_empty())
526 .map(|(idx, _)| idx)
527 .unwrap_or_else(|| version.level_count() - 1);
528
529 let mut canonical_l1_idx = first_non_empty_level;
530
531 let mut level_shift = canonical_l1_idx - 1;
533
534 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
535 let need_new_l1 = version
536 .iter_levels()
537 .enumerate()
538 .skip(1)
539 .filter(|(_, lvl)| !lvl.is_empty())
540 .all(|(idx, level)| {
541 let level_size = level
542 .iter()
543 .flat_map(|x| x.iter())
544 .filter(|x| !state.hidden_set().is_hidden(x.id()))
547 .map(Table::file_size)
548 .sum::<u64>();
549
550 #[expect(
551 clippy::cast_possible_truncation,
552 reason = "level index is bounded by level count (7, technically 255)"
553 )]
554 let target_size = self.level_target_size((idx - level_shift) as u8);
555
556 level_size > target_size
557 });
558
559 if need_new_l1 {
561 canonical_l1_idx -= 1;
562 level_shift -= 1;
563 }
564 }
565
566 'trivial: {
568 let first_level = version.l0();
569 let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
570
571 if first_level.run_count() == 1 {
572 if version.level_is_busy(0, state.hidden_set())
573 || version.level_is_busy(target_level_idx, state.hidden_set())
574 {
575 break 'trivial;
576 }
577
578 let Some(target_level) = &version.level(target_level_idx) else {
579 break 'trivial;
580 };
581
582 if target_level.run_count() != 1 {
583 break 'trivial;
584 }
585
586 let key_range = first_level.aggregate_key_range_cmp(cmp);
587
588 let get_overlapping = target_level
590 .iter()
591 .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
592 .map(Table::id)
593 .next();
594
595 if get_overlapping.is_none() && first_level.is_disjoint() {
596 #[expect(
597 clippy::cast_possible_truncation,
598 reason = "level index is bounded by level count (7)"
599 )]
600 return Choice::Move(CompactionInput {
601 table_ids: first_level.list_ids(),
602 dest_level: target_level_idx as u8,
603 canonical_level: 1,
604 target_size: self.target_size,
605 });
606 }
607 }
608 }
609
610 {
613 let first_level = version.l0();
614
615 if first_level.run_count() > 1
616 && first_level.table_count() < usize::from(self.l0_threshold)
617 && !version.level_is_busy(0, state.hidden_set())
618 {
619 return Choice::Merge(CompactionInput {
620 table_ids: first_level.list_ids(),
621 dest_level: 0,
622 canonical_level: 0,
623 target_size: self.target_size,
624 });
625 }
626 }
627
628 let level_targets = self.compute_level_targets(version, level_shift, state);
630
631 let mut scores = [(0.0, 0u64); 7];
633
634 {
635 let first_level = version.l0();
641
642 if first_level.table_count() >= usize::from(self.l0_threshold) {
643 #[expect(
644 clippy::cast_precision_loss,
645 reason = "precision loss is acceptable for scoring calculations"
646 )]
647 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
648 scores[0] = (ratio, 0);
649 }
650
651 for (idx, level) in version.iter_levels().enumerate().skip(1) {
653 if level.is_empty() {
654 continue;
655 }
656
657 let level_size = level
658 .iter()
659 .flat_map(|x| x.iter())
660 .filter(|x| !state.hidden_set().is_hidden(x.id()))
663 .map(Table::file_size)
664 .sum::<u64>();
665
666 #[expect(clippy::indexing_slicing)]
668 let target_size = level_targets[idx];
669
670 #[expect(
671 clippy::indexing_slicing,
672 reason = "idx is from iter_levels().enumerate() so always < 7 = scores.len()"
673 )]
674 if level_size > target_size {
675 #[expect(
676 clippy::cast_precision_loss,
677 reason = "precision loss is acceptable for scoring calculations"
678 )]
679 let score = level_size as f64 / target_size as f64;
680 scores[idx] = (score, level_size - target_size);
681
682 if version
684 .level(idx + 1)
685 .is_some_and(|next_level| next_level.is_empty())
686 {
687 scores[idx] = (99.99, 999);
688 }
689 }
690 }
691
692 {
694 scores[6] = (0.0, 0);
695 }
696 }
697
698 #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
700 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
701 .into_iter()
702 .enumerate()
703 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
704 score_a
705 .partial_cmp(score_b)
706 .unwrap_or(std::cmp::Ordering::Equal)
707 })
708 .expect("should have highest score somewhere");
709
710 if score < 1.0 {
711 return Choice::DoNothing;
712 }
713
714 if level_idx_with_highest_score == 0 {
716 let Some(first_level) = version.level(0) else {
717 return Choice::DoNothing;
718 };
719
720 if version.level_is_busy(0, state.hidden_set())
721 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
722 {
723 return Choice::DoNothing;
724 }
725
726 let Some(target_level) = &version.level(canonical_l1_idx) else {
727 return Choice::DoNothing;
728 };
729
730 let mut table_ids = first_level.list_ids();
731
732 let key_range = first_level.aggregate_key_range_cmp(cmp);
733
734 let target_level_overlapping_table_ids: Vec<_> = target_level
736 .iter()
737 .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
738 .map(Table::id)
739 .collect();
740
741 table_ids.extend(&target_level_overlapping_table_ids);
742
743 if self.multi_level {
750 let l1_score = scores.get(canonical_l1_idx).map_or(0.0, |(s, _)| *s);
751 let l2_idx = canonical_l1_idx + 1;
752
753 if l1_score > 1.0
754 && l2_idx < version.level_count()
755 && !version.level_is_busy(l2_idx, state.hidden_set())
756 && let Some(l2) = version.level(l2_idx)
757 {
758 table_ids.extend(target_level.list_ids());
760
761 {
771 let mut input_ranges: Vec<_> = target_level
772 .iter()
773 .chain(first_level.iter())
774 .flat_map(|run| run.iter())
775 .map(|t| t.key_range().clone())
776 .collect();
777 input_ranges.sort_by(|a, b| cmp.compare(a.min(), b.min()));
778
779 let merged = crate::KeyRange::merge_sorted_cmp(input_ranges, cmp);
780
781 for run in l2.iter() {
782 for interval in &merged {
783 for l2t in run.get_overlapping_cmp(interval, cmp) {
784 table_ids.insert(Table::id(l2t));
785 }
786 }
787 }
788 }
789
790 #[expect(
791 clippy::cast_possible_truncation,
792 reason = "level index is bounded by level count (7)"
793 )]
794 return Choice::Merge(CompactionInput {
795 table_ids,
796 dest_level: l2_idx as u8,
797 canonical_level: 2,
798 target_size: self.target_size,
799 });
800 }
801 }
802
803 #[expect(
804 clippy::cast_possible_truncation,
805 reason = "level index is bounded by level count (7, technically 255)"
806 )]
807 let choice = CompactionInput {
808 table_ids,
809 dest_level: canonical_l1_idx as u8,
810 canonical_level: 1,
811 target_size: self.target_size,
812 };
813
814 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
815 return Choice::Move(choice);
816 }
817 return Choice::Merge(choice);
818 }
819
820 #[expect(clippy::cast_possible_truncation)]
824 let curr_level_index = level_idx_with_highest_score as u8;
825
826 let next_level_index = curr_level_index + 1;
827
828 let Some(level) = version.level(level_idx_with_highest_score) else {
829 return Choice::DoNothing;
830 };
831
832 let Some(next_level) = version.level(next_level_index as usize) else {
833 return Choice::DoNothing;
834 };
835
836 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
839 level,
840 next_level,
841 state.hidden_set(),
842 overshoot_bytes,
843 self.target_size,
844 cmp,
845 ) else {
846 return Choice::DoNothing;
847 };
848
849 #[expect(
850 clippy::cast_possible_truncation,
851 reason = "level shift is bounded by level count (7, technically 255)"
852 )]
853 let choice = CompactionInput {
854 table_ids,
855 dest_level: next_level_index,
856 canonical_level: next_level_index - (level_shift as u8),
857 target_size: self.target_size,
858 };
859
860 if can_trivial_move && level.is_disjoint() {
861 return Choice::Move(choice);
862 }
863 Choice::Merge(choice)
864 }
865}