1use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 config::Config,
8 level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
9 segment::Segment,
10 windows::{GrowingWindowsExt, ShrinkingWindowsExt},
11 HashSet, KeyRange, SegmentId,
12};
13
14fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
17 KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
18}
19
20fn pick_minimal_compaction(
23 curr_level: &Level,
24 next_level: &Level,
25 hidden_set: &HiddenSet,
26 segment_base_size: u64,
27) -> Option<(HashSet<SegmentId>, bool)> {
28 struct Choice {
32 write_amp: f32,
33 segment_ids: HashSet<SegmentId>,
34 can_trivial_move: bool,
35 }
36
37 let mut choices = vec![];
38
39 let mut add_choice = |choice: Choice| {
40 let mut valid_choice = true;
41
42 valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));
45
46 valid_choice &= choice.can_trivial_move || choice.segment_ids.len() <= 25;
49
50 if valid_choice {
51 choices.push(choice);
52 }
53 };
54
55 for window in next_level.growing_windows() {
56 let next_level_size = next_level.iter().map(|x| x.metadata.file_size).sum::<u64>();
57
58 if next_level_size > (50 * segment_base_size) {
60 break;
63 }
64
65 if hidden_set.is_blocked(window.iter().map(Segment::id)) {
66 continue;
69 }
70
71 let key_range = aggregate_key_range(window);
72
73 let curr_level_pull_in: Vec<_> = if curr_level.is_disjoint {
75 curr_level.contained_segments(&key_range).collect()
93 } else {
94 curr_level.overlapping_segments(&key_range).collect()
97 };
98
99 if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
100 continue;
103 }
104
105 let curr_level_size = curr_level_pull_in
106 .iter()
107 .map(|x| x.metadata.file_size)
108 .sum::<u64>();
109
110 if curr_level_size >= 1 {
113 let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
114
115 let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
116 segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));
117
118 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
119
120 add_choice(Choice {
121 write_amp,
122 segment_ids,
123 can_trivial_move: false,
124 });
125 }
126 }
127
128 for window in curr_level.shrinking_windows() {
130 let key_range = aggregate_key_range(window);
131
132 if next_level.overlapping_segments(&key_range).next().is_none() {
133 add_choice(Choice {
134 write_amp: 0.0,
135 segment_ids: window.iter().map(Segment::id).collect(),
136 can_trivial_move: true,
137 });
138 break;
139 }
140 }
141
142 let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
143 a.write_amp
144 .partial_cmp(&b.write_amp)
145 .unwrap_or(std::cmp::Ordering::Equal)
146 });
147
148 minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
149}
150
151#[derive(Clone)]
163pub struct Strategy {
164 pub l0_threshold: u8,
171
172 pub target_size: u32,
178
179 #[allow(clippy::doc_markdown)]
188 pub level_ratio: u8,
189}
190
191impl Default for Strategy {
192 fn default() -> Self {
193 Self {
194 l0_threshold: 4,
195 target_size:64 * 1_024 * 1_024,
196 level_ratio: 10,
197 }
198 }
199}
200
201impl Strategy {
202 fn level_target_size(&self, level_idx: u8) -> u64 {
211 assert!(level_idx >= 1, "level_target_size does not apply to L0");
212
213 let power = (self.level_ratio as usize).pow(u32::from(level_idx) - 1);
214
215 (power * (self.level_base_size() as usize)) as u64
216 }
217
218 fn level_base_size(&self) -> u64 {
219 u64::from(self.target_size) * u64::from(self.l0_threshold)
220 }
221}
222
223impl CompactionStrategy for Strategy {
224 fn get_name(&self) -> &'static str {
225 "LeveledStrategy"
226 }
227
228 #[allow(clippy::too_many_lines)]
229 fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
230 let view = &levels.levels;
231
232 for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
240 {
241 #[allow(clippy::cast_possible_truncation)]
243 let curr_level_index = curr_level_index as u8;
244
245 let next_level_index = curr_level_index + 1;
246
247 if level.is_empty() {
248 continue;
249 }
250
251 let level_size: u64 = level
252 .segments
253 .iter()
254 .filter(|x| !levels.hidden_set().is_hidden(x.id()))
257 .map(|x| x.metadata.file_size)
258 .sum();
259
260 let desired_bytes = self.level_target_size(curr_level_index);
261
262 let overshoot = level_size.saturating_sub(desired_bytes);
263
264 if overshoot > 0 {
265 let Some(next_level) = &view.get(next_level_index as usize) else {
266 break;
267 };
268
269 let Some((segment_ids, can_trivial_move)) = pick_minimal_compaction(
270 level,
271 next_level,
272 levels.hidden_set(),
273 u64::from(self.target_size),
274 ) else {
275 break;
276 };
277
278 let choice = CompactionInput {
285 segment_ids,
286 dest_level: next_level_index,
287 target_size: u64::from(self.target_size),
288 };
289
290 let goes_into_cold_storage = next_level_index == 2;
299
300 if goes_into_cold_storage {
301 return Choice::Merge(choice);
302 }
303
304 if can_trivial_move && level.is_disjoint {
305 return Choice::Move(choice);
306 }
307 return Choice::Merge(choice);
308 }
309 }
310
311 {
313 let busy_levels = levels.busy_levels();
314
315 let Some(first_level) = view.first() else {
316 return Choice::DoNothing;
317 };
318
319 if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
320 let first_level_size = first_level.size();
321
322 if levels.is_disjoint() {
324 if first_level_size < self.target_size.into() {
325 return if first_level.len() >= 32 {
331 Choice::Merge(CompactionInput {
332 dest_level: 0,
333 segment_ids: first_level.list_ids(),
334 target_size: ((self.target_size as f32) * 1.1) as u64,
336 })
337 } else {
338 Choice::DoNothing
339 };
340 }
341
342 return Choice::Merge(CompactionInput {
343 dest_level: 1,
344 segment_ids: first_level.list_ids(),
345 target_size: ((self.target_size as f32) * 1.1) as u64,
346 });
347 }
348
349 if first_level_size < self.target_size.into() {
350 return Choice::Merge(CompactionInput {
353 dest_level: 0,
354 segment_ids: first_level.list_ids(),
355 target_size: self.target_size.into(),
356 });
357 }
358
359 if !busy_levels.contains(&1) {
360 let mut level = (**first_level).clone();
361 level.sort_by_key_range();
362
363 let Some(next_level) = &view.get(1) else {
364 return Choice::DoNothing;
365 };
366
367 let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();
369
370 let key_range = aggregate_key_range(&level);
372
373 let next_level_overlapping_segment_ids: Vec<_> = next_level
374 .overlapping_segments(&key_range)
375 .map(Segment::id)
376 .collect();
377
378 segment_ids.extend(&next_level_overlapping_segment_ids);
379
380 let choice = CompactionInput {
381 segment_ids,
382 dest_level: 1,
383 target_size: u64::from(self.target_size),
384 };
385
386 if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
387 return Choice::Move(choice);
388 }
389 return Choice::Merge(choice);
390 }
391 }
392 }
393
394 Choice::DoNothing
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use super::{Choice, Strategy};
401 use crate::{
402 cache::Cache,
403 compaction::{CompactionStrategy, Input as CompactionInput},
404 descriptor_table::FileDescriptorTable,
405 level_manifest::LevelManifest,
406 segment::{
407 block::offset::BlockOffset,
408 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
409 file_offsets::FileOffsets,
410 meta::{Metadata, SegmentId},
411 Segment, SegmentInner,
412 },
413 time::unix_timestamp,
414 Config, HashSet, KeyRange,
415 };
416 use std::{
417 path::Path,
418 sync::{atomic::AtomicBool, Arc},
419 };
420 use test_log::test;
421
422 fn string_key_range(a: &str, b: &str) -> KeyRange {
423 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
424 }
425
426 #[allow(
427 clippy::expect_used,
428 clippy::cast_possible_truncation,
429 clippy::cast_sign_loss
430 )]
431 fn fixture_segment(
432 id: SegmentId,
433 key_range: KeyRange,
434 size: u64,
435 tombstone_ratio: f32,
436 ) -> Segment {
437 let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
438
439 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
440 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
441
442 SegmentInner {
443 tree_id: 0,
444 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
445 block_index,
446
447 offsets: FileOffsets {
448 bloom_ptr: BlockOffset(0),
449 range_filter_ptr: BlockOffset(0),
450 index_block_ptr: BlockOffset(0),
451 metadata_ptr: BlockOffset(0),
452 range_tombstones_ptr: BlockOffset(0),
453 tli_ptr: BlockOffset(0),
454 pfx_ptr: BlockOffset(0),
455 },
456
457 metadata: Metadata {
458 data_block_count: 0,
459 index_block_count: 0,
460 data_block_size: 4_096,
461 index_block_size: 4_096,
462 created_at: unix_timestamp().as_nanos(),
463 id,
464 file_size: size,
465 compression: crate::segment::meta::CompressionType::None,
466 table_type: crate::segment::meta::TableType::Block,
467 item_count: 1_000_000,
468 key_count: 0,
469 key_range,
470 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
471 range_tombstone_count: 0,
472 uncompressed_size: 0,
473 seqnos: (0, 0),
474 },
475 cache,
476
477 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
478
479 path: "a".into(),
480 is_deleted: AtomicBool::default(),
481 }
482 .into()
483 }
484
485 #[allow(clippy::expect_used)]
486 fn build_levels(
487 path: &Path,
488 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
489 ) -> crate::Result<LevelManifest> {
490 let mut levels = LevelManifest::create_new(
491 recipe.len().try_into().expect("oopsie"),
492 path.join("levels"),
493 )?;
494
495 for (idx, level) in recipe.into_iter().enumerate() {
496 for (id, min, max, size_mib) in level {
497 levels.insert_into_level(
498 idx.try_into().expect("oopsie"),
499 fixture_segment(
500 id,
501 string_key_range(min, max),
502 size_mib * 1_024 * 1_024,
503 0.0,
504 ),
505 );
506 }
507 }
508
509 Ok(levels)
510 }
511
512 #[test]
513 fn leveled_empty_levels() -> crate::Result<()> {
514 let tempdir = tempfile::tempdir()?;
515 let compactor = Strategy::default();
516
517 #[rustfmt::skip]
518 let levels = build_levels(tempdir.path(), vec![
519 vec![],
520 vec![],
521 vec![],
522 vec![],
523 ])?;
524
525 assert_eq!(
526 compactor.choose(&levels, &Config::default()),
527 Choice::DoNothing
528 );
529
530 Ok(())
531 }
532
533 #[test]
534 fn leveled_default_l0() -> crate::Result<()> {
535 let tempdir = tempfile::tempdir()?;
536 let compactor = Strategy {
537 target_size: 64 * 1_024 * 1_024,
538 ..Default::default()
539 };
540
541 #[rustfmt::skip]
542 let mut levels = build_levels(tempdir.path(), vec![
543 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
544 vec![],
545 vec![],
546 vec![],
547 ])?;
548
549 assert_eq!(
550 compactor.choose(&levels, &Config::default()),
551 Choice::Merge(CompactionInput {
552 dest_level: 1,
553 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
554 target_size: 64 * 1_024 * 1_024
555 })
556 );
557
558 levels.hide_segments(std::iter::once(4));
559
560 assert_eq!(
561 compactor.choose(&levels, &Config::default()),
562 Choice::DoNothing
563 );
564
565 Ok(())
566 }
567
568 #[test]
569 #[allow(
570 clippy::cast_sign_loss,
571 clippy::cast_precision_loss,
572 clippy::cast_possible_truncation
573 )]
574 fn leveled_intra_l0() -> crate::Result<()> {
575 let tempdir = tempfile::tempdir()?;
576 let compactor = Strategy {
577 target_size: 64 * 1_024 * 1_024,
578 ..Default::default()
579 };
580
581 #[rustfmt::skip]
582 let mut levels = build_levels(tempdir.path(), vec![
583 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
584 vec![],
585 vec![],
586 vec![],
587 ])?;
588
589 assert_eq!(
590 compactor.choose(&levels, &Config::default()),
591 Choice::Merge(CompactionInput {
592 dest_level: 0,
593 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
594 target_size: u64::from(compactor.target_size),
595 })
596 );
597
598 levels.hide_segments(std::iter::once(4));
599
600 assert_eq!(
601 compactor.choose(&levels, &Config::default()),
602 Choice::DoNothing
603 );
604
605 Ok(())
606 }
607
608 #[test]
609 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
610 let tempdir = tempfile::tempdir()?;
611 let compactor = Strategy {
612 target_size: 64 * 1_024 * 1_024,
613 ..Default::default()
614 };
615
616 #[rustfmt::skip]
617 let levels = build_levels(tempdir.path(), vec![
618 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
619 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
620 vec![],
621 vec![],
622 ])?;
623
624 assert_eq!(
625 compactor.choose(&levels, &Config::default()),
626 Choice::Merge(CompactionInput {
627 dest_level: 1,
628 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
629 target_size: 64 * 1_024 * 1_024
630 })
631 );
632
633 Ok(())
634 }
635
636 #[test]
637 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
638 let tempdir = tempfile::tempdir()?;
639 let compactor = Strategy {
640 target_size: 64 * 1_024 * 1_024,
641 ..Default::default()
642 };
643
644 #[rustfmt::skip]
645 let mut levels = build_levels(tempdir.path(), vec![
646 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
647 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
648 vec![],
649 vec![],
650 ])?;
651
652 assert_eq!(
653 compactor.choose(&levels, &Config::default()),
654 Choice::Merge(CompactionInput {
655 dest_level: 1,
656 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
657 target_size: 64 * 1_024 * 1_024
658 })
659 );
660
661 levels.hide_segments(std::iter::once(5));
662 assert_eq!(
663 compactor.choose(&levels, &Config::default()),
664 Choice::DoNothing
665 );
666
667 Ok(())
668 }
669
670 #[test]
671 fn levelled_from_tiered() -> crate::Result<()> {
672 let tempdir = tempfile::tempdir()?;
673 let compactor = Strategy {
674 target_size: 64 * 1_024 * 1_024,
675 ..Default::default()
676 };
677 let config = Config::default();
678
679 #[rustfmt::skip]
680 let levels = build_levels(tempdir.path(), vec![
681 vec![],
682 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
683 vec![(4, "a", "g", 64)],
684 vec![],
685 ])?;
686
687 assert_eq!(
688 compactor.choose(&levels, &config),
689 Choice::Merge(CompactionInput {
690 dest_level: 2,
691 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
692 target_size: 64 * 1_024 * 1_024
693 })
694 );
695
696 Ok(())
697 }
698}