1use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{
7 config::Config,
8 key_range::KeyRange,
9 level_manifest::{hidden_set::HiddenSet, level::Level, LevelManifest},
10 segment::Segment,
11 windows::{GrowingWindowsExt, ShrinkingWindowsExt},
12 HashSet, SegmentId,
13};
14
15fn aggregate_key_range(segments: &[Segment]) -> KeyRange {
17 KeyRange::aggregate(segments.iter().map(|x| &x.metadata.key_range))
18}
19
20fn pick_minimal_compaction(
23 curr_level: &Level,
24 next_level: &Level,
25 hidden_set: &HiddenSet,
26) -> Option<(HashSet<SegmentId>, bool)> {
27 struct Choice {
31 write_amp: f32,
32 segment_ids: HashSet<SegmentId>,
33 can_trivial_move: bool,
34 }
35
36 let mut choices = vec![];
37
38 let mut add_choice = |choice: Choice| {
39 let mut valid_choice = true;
40
41 valid_choice &= !choice.segment_ids.iter().any(|x| hidden_set.is_hidden(*x));
44
45 valid_choice &= choice.can_trivial_move || choice.segment_ids.len() <= 25;
48
49 if valid_choice {
50 choices.push(choice);
51 }
52 };
53
54 for window in next_level.growing_windows() {
55 if hidden_set.is_blocked(window.iter().map(Segment::id)) {
56 continue;
59 }
60
61 let key_range = aggregate_key_range(window);
62
63 let curr_level_pull_in: Vec<_> = if curr_level.is_disjoint {
65 curr_level.contained_segments(&key_range).collect()
83 } else {
84 curr_level.overlapping_segments(&key_range).collect()
87 };
88
89 if hidden_set.is_blocked(curr_level_pull_in.iter().map(|x| x.id())) {
90 continue;
93 }
94
95 let curr_level_size = curr_level_pull_in
96 .iter()
97 .map(|x| x.metadata.file_size)
98 .sum::<u64>();
99
100 if curr_level_size >= 1 {
103 let next_level_size = window.iter().map(|x| x.metadata.file_size).sum::<u64>();
104
105 let mut segment_ids: HashSet<_> = window.iter().map(Segment::id).collect();
106 segment_ids.extend(curr_level_pull_in.iter().map(|x| x.id()));
107
108 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
109
110 add_choice(Choice {
111 write_amp,
112 segment_ids,
113 can_trivial_move: false,
114 });
115 }
116 }
117
118 for window in curr_level.shrinking_windows() {
120 let key_range = aggregate_key_range(window);
121
122 if next_level.overlapping_segments(&key_range).next().is_none() {
123 add_choice(Choice {
124 write_amp: 0.0,
125 segment_ids: window.iter().map(Segment::id).collect(),
126 can_trivial_move: true,
127 });
128 break;
129 }
130 }
131
132 let minimum_effort_choice = choices.into_iter().min_by(|a, b| {
133 a.write_amp
134 .partial_cmp(&b.write_amp)
135 .unwrap_or(std::cmp::Ordering::Equal)
136 });
137
138 minimum_effort_choice.map(|c| (c.segment_ids, c.can_trivial_move))
139}
140
141#[derive(Clone)]
153pub struct Strategy {
154 pub l0_threshold: u8,
161
162 pub target_size: u32,
168
169 #[allow(clippy::doc_markdown)]
176 pub level_ratio: u8,
177}
178
179impl Default for Strategy {
180 fn default() -> Self {
181 Self {
182 l0_threshold: 4,
183 target_size:64 * 1_024 * 1_024,
184 level_ratio: 10,
185 }
186 }
187}
188
189impl Strategy {
190 fn level_target_size(&self, level_idx: u8) -> u64 {
199 assert!(level_idx >= 1, "level_target_size does not apply to L0");
200
201 let power = (self.level_ratio as usize).pow(u32::from(level_idx) - 1);
202
203 (power * (self.level_base_size() as usize)) as u64
204 }
205
206 fn level_base_size(&self) -> u64 {
207 self.target_size as u64 * self.l0_threshold as u64
208 }
209}
210
211impl CompactionStrategy for Strategy {
212 fn get_name(&self) -> &'static str {
213 "LeveledStrategy"
214 }
215
216 #[allow(clippy::too_many_lines)]
217 fn choose(&self, levels: &LevelManifest, _: &Config) -> Choice {
218 let view = &levels.levels;
219
220 for (curr_level_index, level) in view.iter().enumerate().skip(1).take(view.len() - 2).rev()
222 {
223 #[allow(clippy::cast_possible_truncation)]
225 let curr_level_index = curr_level_index as u8;
226
227 let next_level_index = curr_level_index + 1;
228
229 if level.is_empty() {
230 continue;
231 }
232
233 let level_size: u64 = level
234 .segments
235 .iter()
236 .filter(|x| !levels.hidden_set().is_hidden(x.id()))
239 .map(|x| x.metadata.file_size)
240 .sum();
241
242 let desired_bytes = self.level_target_size(curr_level_index);
243
244 let overshoot = level_size.saturating_sub(desired_bytes);
245
246 if overshoot > 0 {
247 let Some(next_level) = &view.get(next_level_index as usize) else {
248 break;
249 };
250
251 let Some((segment_ids, can_trivial_move)) =
252 pick_minimal_compaction(level, next_level, levels.hidden_set())
253 else {
254 break;
255 };
256
257 let choice = CompactionInput {
264 segment_ids,
265 dest_level: next_level_index,
266 target_size: u64::from(self.target_size),
267 };
268
269 let goes_into_cold_storage = next_level_index == 2;
278
279 if goes_into_cold_storage {
280 return Choice::Merge(choice);
281 }
282
283 if can_trivial_move && level.is_disjoint {
284 return Choice::Move(choice);
285 }
286 return Choice::Merge(choice);
287 }
288 }
289
290 {
292 let busy_levels = levels.busy_levels();
293
294 let Some(first_level) = view.first() else {
295 return Choice::DoNothing;
296 };
297
298 if first_level.len() >= self.l0_threshold.into() && !busy_levels.contains(&0) {
299 let first_level_size = first_level.size();
300
301 if levels.is_disjoint() {
303 if first_level_size < self.target_size.into() {
304 return if first_level.len() >= 32 {
310 Choice::Merge(CompactionInput {
311 dest_level: 0,
312 segment_ids: first_level.list_ids(),
313 target_size: ((self.target_size as f32) * 1.1) as u64,
315 })
316 } else {
317 Choice::DoNothing
318 };
319 }
320
321 return Choice::Merge(CompactionInput {
322 dest_level: 1,
323 segment_ids: first_level.list_ids(),
324 target_size: ((self.target_size as f32) * 1.1) as u64,
325 });
326 }
327
328 if first_level_size < self.target_size.into() {
329 return Choice::Merge(CompactionInput {
332 dest_level: 0,
333 segment_ids: first_level.list_ids(),
334 target_size: self.target_size.into(),
335 });
336 }
337
338 if !busy_levels.contains(&1) {
339 let mut level = (**first_level).clone();
340 level.sort_by_key_range();
341
342 let Some(next_level) = &view.get(1) else {
343 return Choice::DoNothing;
344 };
345
346 let mut segment_ids: HashSet<u64> = level.iter().map(Segment::id).collect();
347
348 let key_range = aggregate_key_range(&level);
350
351 let next_level_overlapping_segment_ids: Vec<_> = next_level
352 .overlapping_segments(&key_range)
353 .map(Segment::id)
354 .collect();
355
356 segment_ids.extend(&next_level_overlapping_segment_ids);
357
358 let choice = CompactionInput {
359 segment_ids,
360 dest_level: 1,
361 target_size: u64::from(self.target_size),
362 };
363
364 if next_level_overlapping_segment_ids.is_empty() && level.is_disjoint {
365 return Choice::Move(choice);
366 }
367 return Choice::Merge(choice);
368 }
369 }
370 }
371
372 Choice::DoNothing
373 }
374}
375
376#[cfg(test)]
377mod tests {
378 use super::{Choice, Strategy};
379 use crate::{
380 block_cache::BlockCache,
381 compaction::{CompactionStrategy, Input as CompactionInput},
382 descriptor_table::FileDescriptorTable,
383 key_range::KeyRange,
384 level_manifest::LevelManifest,
385 segment::{
386 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
387 file_offsets::FileOffsets,
388 meta::{Metadata, SegmentId},
389 value_block::BlockOffset,
390 Segment, SegmentInner,
391 },
392 time::unix_timestamp,
393 Config, HashSet,
394 };
395 use std::{path::Path, sync::Arc};
396 use test_log::test;
397
398 fn string_key_range(a: &str, b: &str) -> KeyRange {
399 KeyRange::new((a.as_bytes().into(), b.as_bytes().into()))
400 }
401
402 #[allow(
403 clippy::expect_used,
404 clippy::cast_possible_truncation,
405 clippy::cast_sign_loss
406 )]
407 fn fixture_segment(
408 id: SegmentId,
409 key_range: KeyRange,
410 size: u64,
411 tombstone_ratio: f32,
412 ) -> Segment {
413 let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
414
415 let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
416 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
417
418 SegmentInner {
419 tree_id: 0,
420 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
421 block_index,
422
423 offsets: FileOffsets {
424 bloom_ptr: BlockOffset(0),
425 range_filter_ptr: BlockOffset(0),
426 index_block_ptr: BlockOffset(0),
427 metadata_ptr: BlockOffset(0),
428 range_tombstones_ptr: BlockOffset(0),
429 tli_ptr: BlockOffset(0),
430 pfx_ptr: BlockOffset(0),
431 },
432
433 metadata: Metadata {
434 data_block_count: 0,
435 index_block_count: 0,
436 data_block_size: 4_096,
437 index_block_size: 4_096,
438 created_at: unix_timestamp().as_nanos(),
439 id,
440 file_size: size,
441 compression: crate::segment::meta::CompressionType::None,
442 table_type: crate::segment::meta::TableType::Block,
443 item_count: 1_000_000,
444 key_count: 0,
445 key_range,
446 tombstone_count: (1_000_000.0 * tombstone_ratio) as u64,
447 range_tombstone_count: 0,
448 uncompressed_size: 0,
449 seqnos: (0, 0),
450 },
451 block_cache,
452
453 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
454 }
455 .into()
456 }
457
458 #[allow(clippy::expect_used)]
459 fn build_levels(
460 path: &Path,
461 recipe: Vec<Vec<(SegmentId, &str, &str, u64)>>,
462 ) -> crate::Result<LevelManifest> {
463 let mut levels = LevelManifest::create_new(
464 recipe.len().try_into().expect("oopsie"),
465 path.join("levels"),
466 )?;
467
468 for (idx, level) in recipe.into_iter().enumerate() {
469 for (id, min, max, size_mib) in level {
470 levels.insert_into_level(
471 idx.try_into().expect("oopsie"),
472 fixture_segment(
473 id,
474 string_key_range(min, max),
475 size_mib * 1_024 * 1_024,
476 0.0,
477 ),
478 );
479 }
480 }
481
482 Ok(levels)
483 }
484
485 #[test]
486 fn leveled_empty_levels() -> crate::Result<()> {
487 let tempdir = tempfile::tempdir()?;
488 let compactor = Strategy::default();
489
490 #[rustfmt::skip]
491 let levels = build_levels(tempdir.path(), vec![
492 vec![],
493 vec![],
494 vec![],
495 vec![],
496 ])?;
497
498 assert_eq!(
499 compactor.choose(&levels, &Config::default()),
500 Choice::DoNothing
501 );
502
503 Ok(())
504 }
505
506 #[test]
507 fn leveled_default_l0() -> crate::Result<()> {
508 let tempdir = tempfile::tempdir()?;
509 let compactor = Strategy {
510 target_size: 64 * 1_024 * 1_024,
511 ..Default::default()
512 };
513
514 #[rustfmt::skip]
515 let mut levels = build_levels(tempdir.path(), vec![
516 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "a", "z", 64), (4, "a", "z", 64)],
517 vec![],
518 vec![],
519 vec![],
520 ])?;
521
522 assert_eq!(
523 compactor.choose(&levels, &Config::default()),
524 Choice::Merge(CompactionInput {
525 dest_level: 1,
526 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
527 target_size: 64 * 1_024 * 1_024
528 })
529 );
530
531 levels.hide_segments(std::iter::once(4));
532
533 assert_eq!(
534 compactor.choose(&levels, &Config::default()),
535 Choice::DoNothing
536 );
537
538 Ok(())
539 }
540
541 #[test]
542 #[allow(
543 clippy::cast_sign_loss,
544 clippy::cast_precision_loss,
545 clippy::cast_possible_truncation
546 )]
547 fn leveled_intra_l0() -> crate::Result<()> {
548 let tempdir = tempfile::tempdir()?;
549 let compactor = Strategy {
550 target_size: 64 * 1_024 * 1_024,
551 ..Default::default()
552 };
553
554 #[rustfmt::skip]
555 let mut levels = build_levels(tempdir.path(), vec![
556 vec![(1, "a", "z", 1), (2, "a", "z", 1), (3, "a", "z", 1), (4, "a", "z", 1)],
557 vec![],
558 vec![],
559 vec![],
560 ])?;
561
562 assert_eq!(
563 compactor.choose(&levels, &Config::default()),
564 Choice::Merge(CompactionInput {
565 dest_level: 0,
566 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
567 target_size: u64::from(compactor.target_size),
568 })
569 );
570
571 levels.hide_segments(std::iter::once(4));
572
573 assert_eq!(
574 compactor.choose(&levels, &Config::default()),
575 Choice::DoNothing
576 );
577
578 Ok(())
579 }
580
581 #[test]
582 fn leveled_more_than_min_no_overlap() -> crate::Result<()> {
583 let tempdir = tempfile::tempdir()?;
584 let compactor = Strategy {
585 target_size: 64 * 1_024 * 1_024,
586 ..Default::default()
587 };
588
589 #[rustfmt::skip]
590 let levels = build_levels(tempdir.path(), vec![
591 vec![(1, "h", "t", 64), (2, "h", "t", 64), (3, "h", "t", 64), (4, "h", "t", 64)],
592 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "a", "g", 64), (8, "a", "g", 64)],
593 vec![],
594 vec![],
595 ])?;
596
597 assert_eq!(
598 compactor.choose(&levels, &Config::default()),
599 Choice::Merge(CompactionInput {
600 dest_level: 1,
601 segment_ids: [1, 2, 3, 4].into_iter().collect::<HashSet<_>>(),
602 target_size: 64 * 1_024 * 1_024
603 })
604 );
605
606 Ok(())
607 }
608
609 #[test]
610 fn leveled_more_than_min_with_overlap() -> crate::Result<()> {
611 let tempdir = tempfile::tempdir()?;
612 let compactor = Strategy {
613 target_size: 64 * 1_024 * 1_024,
614 ..Default::default()
615 };
616
617 #[rustfmt::skip]
618 let mut levels = build_levels(tempdir.path(), vec![
619 vec![(1, "a", "g", 64), (2, "h", "t", 64), (3, "i", "t", 64), (4, "j", "t", 64)],
620 vec![(5, "a", "g", 64), (6, "a", "g", 64), (7, "y", "z", 64), (8, "y", "z", 64)],
621 vec![],
622 vec![],
623 ])?;
624
625 assert_eq!(
626 compactor.choose(&levels, &Config::default()),
627 Choice::Merge(CompactionInput {
628 dest_level: 1,
629 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
630 target_size: 64 * 1_024 * 1_024
631 })
632 );
633
634 levels.hide_segments(std::iter::once(5));
635 assert_eq!(
636 compactor.choose(&levels, &Config::default()),
637 Choice::DoNothing
638 );
639
640 Ok(())
641 }
642
643 #[test]
644 fn levelled_from_tiered() -> crate::Result<()> {
645 let tempdir = tempfile::tempdir()?;
646 let compactor = Strategy {
647 target_size: 64 * 1_024 * 1_024,
648 ..Default::default()
649 };
650 let config = Config::default();
651
652 #[rustfmt::skip]
653 let levels = build_levels(tempdir.path(), vec![
654 vec![],
655 vec![(1, "a", "z", 64), (2, "a", "z", 64), (3, "g", "z", 64), (5, "g", "z", 64), (6, "g", "z", 64)],
656 vec![(4, "a", "g", 64)],
657 vec![],
658 ])?;
659
660 assert_eq!(
661 compactor.choose(&levels, &config),
662 Choice::Merge(CompactionInput {
663 dest_level: 2,
664 segment_ids: [1, 2, 3, 4, 5, 6].into_iter().collect::<HashSet<_>>(),
665 target_size: 64 * 1_024 * 1_024
666 })
667 );
668
669 Ok(())
670 }
671}