1use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 config::Config,
8 level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
9 segment::Segment,
10 windows::{GrowingWindowsExt, ShrinkingWindowsExt},
11 HashSet, KeyRange, SegmentId,
12};
13
14fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
17 KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
18}
19
20fn pick_minimal_compaction(
23 curr_level: &Level,
24 next_level: &Level,
25 hidden_set: &HiddenSet,
26) -> Option<(HashSet<SegmentId>, bool)> {
27 struct Choice {
31 write_amp: f32,
32 segment_ids: HashSet<SegmentId>,
33 can_trivial_move: bool,
34 }
35
36 let mut choices = vec![];
37
38 let mut add_choice = |choice: Choice| {
39 let mut valid_choice = true;
40
41 valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));
44
45 valid_choice &= choice.can_trivial_move || choice.segment_ids.len() <= 25;
48
49 if valid_choice {
50 choices.push(choice);
51 }
52 };
53
54 for window in next_level.growing_windows() {
55 if hidden_set.is_blocked(window.iter().map(Segment::id)) {
56 continue;
59 }
60
61 let key_range = aggregate_key_range(window);
62
63 let curr_level_pull_in: Vec<_> = if curr_level.is_disjoint {
65 curr_level.contained_segments(&key_range).collect()
83 } else {
84 curr_level.overlapping_segments(&key_range).collect()
87 };
88
89 if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
90 continue;
93 }
94
95 let curr_level_size = curr_level_pull_in
96 .iter()
97 .map(|x| x.metadata.file_size)
98 .sum::<u64>();
99
100 if curr_level_size >= 1 {
103 let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
104
105 let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
106 segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));
107
108 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
109
110 add_choice(Choice {
111 write_amp,
112 segment_ids,
113 can_trivial_move: false,
114 });
115 }
116 }
117
118 for window in curr_level.shrinking_windows() {
120 let key_range = aggregate_key_range(window);
121
122 if next_level.overlapping_segments(&key_range).next().is_none() {
123 add_choice(Choice {
124 write_amp: 0.0,
125 segment_ids: window.iter().map(Segment::id).collect(),
126 can_trivial_move: true,
127 });
128 break;
129 }
130 }
131
132 let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
133 a.write_amp
134 .partial_cmp(&b.write_amp)
135 .unwrap_or(std::cmp::Ordering::Equal)
136 });
137
138 minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
139}
140
141#[derive(Clone)]
153pub struct Strategy {
154 pub l0_threshold: u8,
161
162 pub target_size: u32,
168
169 #[allow(clippy::doc_markdown)]
176 pub level_ratio: u8,
177}
178
179impl Default for Strategy {
180 fn default() -> Self {
181 Self {
182 l0_threshold: 4,
183 target_size:64 * 1_024 * 1_024,
184 level_ratio: 10,
185 }
186 }
187}
188
189impl Strategy {
190 fn level_target_size(&self, level_idx: u8) -> u64 {
199 assert!(level_idx >= 1, "level_target_size does not apply to L0");
200
201 let power = (self.level_ratio as usize).pow(u32::from(level_idx) - 1);
202
203 (power * (self.level_base_size() as usize)) as u64
204 }
205
206 fn level_base_size(&self) -> u64 {
207 u64::from(self.target_size) * u64::from(self.l0_threshold)
208 }
209}
210
211impl CompactionStrategy for Strategy {
212 fn get_name(&self) -> &'static str {
213 "LeveledStrategy"
214 }
215
216 #[allow(clippy::too_many_lines)]
217 fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
218 let view = &levels.levels;
219
220 for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
228 {
229 #[allow(clippy::cast_possible_truncation)]
231 let curr_level_index = curr_level_index as u8;
232
233 let next_level_index = curr_level_index + 1;
234
235 if level.is_empty() {
236 continue;
237 }
238
239 let level_size: u64 = level
240 .segments
241 .iter()
242 .filter(|x| !levels.hidden_set().is_hidden(x.id()))
245 .map(|x| x.metadata.file_size)
246 .sum();
247
248 let desired_bytes = self.level_target_size(curr_level_index);
249
250 let overshoot = level_size.saturating_sub(desired_bytes);
251
252 if overshoot > 0 {
253 let Some(next_level) = &view.get(next_level_index as usize) else {
254 break;
255 };
256
257 let Some((segment_ids, can_trivial_move)) =
258 pick_minimal_compaction(level, next_level, levels.hidden_set())
259 else {
260 break;
261 };
262
263 let choice = CompactionInput {
270 segment_ids,
271 dest_level: next_level_index,
272 target_size: u64::from(self.target_size),
273 };
274
275 let goes_into_cold_storage = next_level_index == 2;
284
285 if goes_into_cold_storage {
286 return Choice::Merge(choice);
287 }
288
289 if can_trivial_move && level.is_disjoint {
290 return Choice::Move(choice);
291 }
292 return Choice::Merge(choice);
293 }
294 }
295
296 {
298 let busy_levels = levels.busy_levels();
299
300 let Some(first_level) = view.first() else {
301 return Choice::DoNothing;
302 };
303
304 if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
305 let first_level_size = first_level.size();
306
307 if levels.is_disjoint() {
309 if first_level_size < self.target_size.into() {
310 return if first_level.len() >= 32 {
316 Choice::Merge(CompactionInput {
317 dest_level: 0,
318 segment_ids: first_level.list_ids(),
319 target_size: ((self.target_size as f32) * 1.1) as u64,
321 })
322 } else {
323 Choice::DoNothing
324 };
325 }
326
327 return Choice::Merge(CompactionInput {
328 dest_level: 1,
329 segment_ids: first_level.list_ids(),
330 target_size: ((self.target_size as f32) * 1.1) as u64,
331 });
332 }
333
334 if first_level_size < self.target_size.into() {
335 return Choice::Merge(CompactionInput {
338 dest_level: 0,
339 segment_ids: first_level.list_ids(),
340 target_size: self.target_size.into(),
341 });
342 }
343
344 if !busy_levels.contains(&1) {
345 let mut level = (**first_level).clone();
346 level.sort_by_key_range();
347
348 let Some(next_level) = &view.get(1) else {
349 return Choice::DoNothing;
350 };
351
352 let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();
353
354 let key_range = aggregate_key_range(&level);
356
357 let next_level_overlapping_segment_ids: Vec<_> = next_level
358 .overlapping_segments(&key_range)
359 .map(Segment::id)
360 .collect();
361
362 segment_ids.extend(&next_level_overlapping_segment_ids);
363
364 let choice = CompactionInput {
365 segment_ids,
366 dest_level: 1,
367 target_size: u64::from(self.target_size),
368 };
369
370 if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
371 return Choice::Move(choice);
372 }
373 return Choice::Merge(choice);
374 }
375 }
376 }
377
378 Choice::DoNothing
379 }
380}
381
382#[cfg(test)]
383mod tests {
384 use super::{Choice, Strategy};
385 use crate::{
386 block_cache::BlockCache,
387 compaction::{CompactionStrategy, Input as CompactionInput},
388 descriptor_table::FileDescriptorTable,
389 level_manifest::LevelManifest,
390 segment::{
391 block::offset::BlockOffset,
392 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
393 file_offsets::FileOffsets,
394 meta::{Metadata, SegmentId},
395 Segment, SegmentInner,
396 },
397 time::unix_timestamp,
398 Config, HashSet, KeyRange,
399 };
400 use std::{
401 path::Path,
402 sync::{atomic::AtomicBool, Arc},
403 };
404 use test_log::test;
405
406 fn string_key_range(a: &str, b: &str) -> KeyRange {
407 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
408 }
409
410 #[allow(
411 clippy::expect_used,
412 clippy::cast_possible_truncation,
413 clippy::cast_sign_loss
414 )]
415 fn fixture_segment(
416 id: SegmentId,
417 key_range: KeyRange,
418 size: u64,
419 tombstone_ratio: f32,
420 ) -> Segment {
421 let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
422
423 let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
424 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
425
426 SegmentInner {
427 tree_id: 0,
428 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
429 block_index,
430
431 offsets: FileOffsets {
432 bloom_ptr: BlockOffset(0),
433 range_filter_ptr: BlockOffset(0),
434 index_block_ptr: BlockOffset(0),
435 metadata_ptr: BlockOffset(0),
436 range_tombstones_ptr: BlockOffset(0),
437 tli_ptr: BlockOffset(0),
438 pfx_ptr: BlockOffset(0),
439 },
440
441 metadata: Metadata {
442 data_block_count: 0,
443 index_block_count: 0,
444 data_block_size: 4_096,
445 index_block_size: 4_096,
446 created_at: unix_timestamp().as_nanos(),
447 id,
448 file_size: size,
449 compression: crate::segment::meta::CompressionType::None,
450 table_type: crate::segment::meta::TableType::Block,
451 item_count: 1_000_000,
452 key_count: 0,
453 key_range,
454 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
455 range_tombstone_count: 0,
456 uncompressed_size: 0,
457 seqnos: (0, 0),
458 },
459 block_cache,
460
461 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
462
463 path: "a".into(),
464 is_deleted: AtomicBool::default(),
465 }
466 .into()
467 }
468
469 #[allow(clippy::expect_used)]
470 fn build_levels(
471 path: &Path,
472 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
473 ) -> crate::Result<LevelManifest> {
474 let mut levels = LevelManifest::create_new(
475 recipe.len().try_into().expect("oopsie"),
476 path.join("levels"),
477 )?;
478
479 for (idx, level) in recipe.into_iter().enumerate() {
480 for (id, min, max, size_mib) in level {
481 levels.insert_into_level(
482 idx.try_into().expect("oopsie"),
483 fixture_segment(
484 id,
485 string_key_range(min, max),
486 size_mib * 1_024 * 1_024,
487 0.0,
488 ),
489 );
490 }
491 }
492
493 Ok(levels)
494 }
495
496 #[test]
497 fn leveled_empty_levels() -> crate::Result<()> {
498 let tempdir = tempfile::tempdir()?;
499 let compactor = Strategy::default();
500
501 #[rustfmt::skip]
502 let levels = build_levels(tempdir.path(), vec![
503 vec![],
504 vec![],
505 vec![],
506 vec![],
507 ])?;
508
509 assert_eq!(
510 compactor.choose(&levels, &Config::default()),
511 Choice::DoNothing
512 );
513
514 Ok(())
515 }
516
517 #[test]
518 fn leveled_default_l0() -> crate::Result<()> {
519 let tempdir = tempfile::tempdir()?;
520 let compactor = Strategy {
521 target_size: 64 * 1_024 * 1_024,
522 ..Default::default()
523 };
524
525 #[rustfmt::skip]
526 let mut levels = build_levels(tempdir.path(), vec![
527 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
528 vec![],
529 vec![],
530 vec![],
531 ])?;
532
533 assert_eq!(
534 compactor.choose(&levels, &Config::default()),
535 Choice::Merge(CompactionInput {
536 dest_level: 1,
537 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
538 target_size: 64 * 1_024 * 1_024
539 })
540 );
541
542 levels.hide_segments(std::iter::once(4));
543
544 assert_eq!(
545 compactor.choose(&levels, &Config::default()),
546 Choice::DoNothing
547 );
548
549 Ok(())
550 }
551
552 #[test]
553 #[allow(
554 clippy::cast_sign_loss,
555 clippy::cast_precision_loss,
556 clippy::cast_possible_truncation
557 )]
558 fn leveled_intra_l0() -> crate::Result<()> {
559 let tempdir = tempfile::tempdir()?;
560 let compactor = Strategy {
561 target_size: 64 * 1_024 * 1_024,
562 ..Default::default()
563 };
564
565 #[rustfmt::skip]
566 let mut levels = build_levels(tempdir.path(), vec![
567 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
568 vec![],
569 vec![],
570 vec![],
571 ])?;
572
573 assert_eq!(
574 compactor.choose(&levels, &Config::default()),
575 Choice::Merge(CompactionInput {
576 dest_level: 0,
577 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
578 target_size: u64::from(compactor.target_size),
579 })
580 );
581
582 levels.hide_segments(std::iter::once(4));
583
584 assert_eq!(
585 compactor.choose(&levels, &Config::default()),
586 Choice::DoNothing
587 );
588
589 Ok(())
590 }
591
592 #[test]
593 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
594 let tempdir = tempfile::tempdir()?;
595 let compactor = Strategy {
596 target_size: 64 * 1_024 * 1_024,
597 ..Default::default()
598 };
599
600 #[rustfmt::skip]
601 let levels = build_levels(tempdir.path(), vec![
602 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
603 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
604 vec![],
605 vec![],
606 ])?;
607
608 assert_eq!(
609 compactor.choose(&levels, &Config::default()),
610 Choice::Merge(CompactionInput {
611 dest_level: 1,
612 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
613 target_size: 64 * 1_024 * 1_024
614 })
615 );
616
617 Ok(())
618 }
619
620 #[test]
621 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
622 let tempdir = tempfile::tempdir()?;
623 let compactor = Strategy {
624 target_size: 64 * 1_024 * 1_024,
625 ..Default::default()
626 };
627
628 #[rustfmt::skip]
629 let mut levels = build_levels(tempdir.path(), vec![
630 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
631 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
632 vec![],
633 vec![],
634 ])?;
635
636 assert_eq!(
637 compactor.choose(&levels, &Config::default()),
638 Choice::Merge(CompactionInput {
639 dest_level: 1,
640 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
641 target_size: 64 * 1_024 * 1_024
642 })
643 );
644
645 levels.hide_segments(std::iter::once(5));
646 assert_eq!(
647 compactor.choose(&levels, &Config::default()),
648 Choice::DoNothing
649 );
650
651 Ok(())
652 }
653
654 #[test]
655 fn levelled_from_tiered() -> crate::Result<()> {
656 let tempdir = tempfile::tempdir()?;
657 let compactor = Strategy {
658 target_size: 64 * 1_024 * 1_024,
659 ..Default::default()
660 };
661 let config = Config::default();
662
663 #[rustfmt::skip]
664 let levels = build_levels(tempdir.path(), vec![
665 vec![],
666 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
667 vec![(4, "a", "g", 64)],
668 vec![],
669 ])?;
670
671 assert_eq!(
672 compactor.choose(&levels, &config),
673 Choice::Merge(CompactionInput {
674 dest_level: 2,
675 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
676 target_size: 64 * 1_024 * 1_024
677 })
678 );
679
680 Ok(())
681 }
682}