use std::sync::Arc;
use crate::{
levels::KeyRange,
segment::Segment,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompactionJobType {
Flush,
L0Compaction,
LevelCompaction,
TrivialMove,
Manual,
}
impl CompactionJobType {
pub fn base_priority(&self) -> u32 {
match self {
| Self::TrivialMove => 0, // Instant, no I/O
| Self::Flush => 1, | Self::L0Compaction => 2, | Self::LevelCompaction => 3,
| Self::Manual => 2, // User-initiated, should be fast
}
}
pub fn blocks_writes(&self) -> bool {
matches!(self, Self::Flush)
}
}
#[derive(Clone)]
pub struct CompactionInput {
pub level: u8,
pub segments: Vec<Arc<Segment>>,
pub key_range: KeyRange,
pub total_size: u64,
}
impl CompactionInput {
pub fn with_key_range(level: u8, segments: Vec<Arc<Segment>>, key_ranges: &[KeyRange]) -> Self {
if segments.is_empty() {
panic!("Cannot create CompactionInput with empty segments");
}
let total_size: u64 = segments.iter().map(|s| s.size_in_bytes()).sum();
let mut min_key: Option<Vec<u8>> = None;
let mut max_key: Option<Vec<u8>> = None;
for seg in &segments {
if let Some(range) = key_ranges.iter().find(|r| r.segment_id == seg.id()) {
match &min_key {
| None => min_key = Some(range.start.clone()),
| Some(current_min) => {
if range.start < *current_min {
min_key = Some(range.start.clone());
}
},
}
match &max_key {
| None => max_key = Some(range.end.clone()),
| Some(current_max) => {
if range.end > *current_max {
max_key = Some(range.end.clone());
}
},
}
}
}
let key_range = KeyRange::new(
min_key.expect("Segment should have key range"),
max_key.expect("Segment should have key range"),
segments[0].id(),
);
Self {
level,
segments,
key_range,
total_size,
}
}
pub fn num_segments(&self) -> usize {
self.segments.len()
}
}
#[derive(Clone)]
pub struct CompactionOutput {
pub level: u8,
pub target_segment_size: u64,
pub max_segments: Option<usize>,
}
impl CompactionOutput {
pub fn new(level: u8, target_segment_size: u64) -> Self {
Self {
level,
target_segment_size,
max_segments: None,
}
}
pub fn with_max_segments(mut self, max: usize) -> Self {
self.max_segments = Some(max);
self
}
}
pub struct CompactionJob {
pub id: u64,
pub job_type: CompactionJobType,
pub input: CompactionInput,
pub next_level_input: Option<CompactionInput>,
pub output: CompactionOutput,
pub score: f64,
pub can_parallelize: bool,
pub allocated_segment_ids: Vec<u64>,
}
impl CompactionJob {
pub fn new(
id: u64,
job_type: CompactionJobType,
input: CompactionInput,
next_level_input: Option<CompactionInput>,
output: CompactionOutput,
allocated_segment_ids: Vec<u64>,
) -> Self {
let score = Self::calculate_score(job_type, &input, next_level_input.as_ref());
let can_parallelize = matches!(
job_type,
CompactionJobType::TrivialMove | CompactionJobType::Flush
);
Self {
id,
job_type,
input,
next_level_input,
output,
score,
can_parallelize,
allocated_segment_ids,
}
}
fn calculate_score(
job_type: CompactionJobType,
input: &CompactionInput,
next_level_input: Option<&CompactionInput>,
) -> f64 {
match job_type {
| CompactionJobType::TrivialMove => {
100.0
},
| CompactionJobType::Flush => {
input.num_segments() as f64 * 10.0
},
| CompactionJobType::L0Compaction => {
let l0_files = input.num_segments() as f64;
let l0_size = input.total_size as f64;
let file_score = l0_files * 5.0;
let size_score = (l0_size / (64.0 * 1024.0 * 1024.0)) * 2.0;
file_score + size_score
},
| CompactionJobType::LevelCompaction => {
let input_size = input.total_size as f64;
let next_level_size = next_level_input.map(|n| n.total_size as f64).unwrap_or(0.0);
let target_size = 64.0 * 1024.0 * 1024.0 * 10_f64.powi(input.level as i32);
let total_size = input_size + next_level_size;
total_size / target_size
},
| CompactionJobType::Manual => {
50.0
},
}
}
pub fn total_input_segments(&self) -> usize {
let mut count = self.input.num_segments();
if let Some(ref next) = self.next_level_input {
count += next.num_segments();
}
count
}
pub fn total_input_size(&self) -> u64 {
let mut size = self.input.total_size;
if let Some(ref next) = self.next_level_input {
size += next.total_size;
}
size
}
pub fn write_amplification(&self) -> f64 {
if self.input.total_size == 0 {
return 1.0;
}
match self.job_type {
| CompactionJobType::TrivialMove => 0.0, // No writes
| CompactionJobType::Flush => 1.0, | CompactionJobType::L0Compaction | CompactionJobType::LevelCompaction => {
self.total_input_size() as f64 / self.input.total_size as f64
},
| CompactionJobType::Manual => {
self.total_input_size() as f64 / self.input.total_size as f64
},
}
}
pub fn overlaps_with(&self, other: &CompactionJob) -> bool {
if self.input.level == other.input.level {
return self.input.key_range.overlaps(&other.input.key_range);
}
if self.output.level == other.input.level {
return self.input.key_range.overlaps(&other.input.key_range);
}
if other.output.level == self.input.level {
return other.input.key_range.overlaps(&self.input.key_range);
}
false
}
}
impl std::fmt::Debug for CompactionJob {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"Job#{} {:?} L{}→L{} ({} files, {:.1} MB, score={:.2})",
self.id,
self.job_type,
self.input.level,
self.output.level,
self.total_input_segments(),
self.total_input_size() as f64 / (1024.0 * 1024.0),
self.score
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_job_type_priority() {
assert!(
CompactionJobType::TrivialMove.base_priority() <
CompactionJobType::Flush.base_priority()
);
assert!(
CompactionJobType::Flush.base_priority() <
CompactionJobType::L0Compaction.base_priority()
);
assert!(
CompactionJobType::L0Compaction.base_priority() <
CompactionJobType::LevelCompaction.base_priority()
);
}
#[test]
fn test_job_type_blocks_writes() {
assert!(CompactionJobType::Flush.blocks_writes());
assert!(!CompactionJobType::L0Compaction.blocks_writes());
assert!(!CompactionJobType::LevelCompaction.blocks_writes());
assert!(!CompactionJobType::TrivialMove.blocks_writes());
}
#[test]
fn test_compaction_output_builder() {
let output = CompactionOutput::new(1, 64 * 1024 * 1024).with_max_segments(10);
assert_eq!(output.level, 1);
assert_eq!(output.target_segment_size, 64 * 1024 * 1024);
assert_eq!(output.max_segments, Some(10));
}
#[test]
fn test_score_calculation_flush() {
let _input = CompactionInput {
level: 0,
segments: vec![], key_range: KeyRange::new(vec![], vec![], 0),
total_size: 256 * 1024 * 1024, };
let num_memtables = 4;
let expected_score = num_memtables as f64 * 10.0;
assert_eq!(expected_score, 40.0);
}
#[test]
fn test_trivial_move_highest_score() {
let input = CompactionInput {
level: 1,
segments: vec![],
key_range: KeyRange::new(vec![], vec![], 0),
total_size: 64 * 1024 * 1024,
};
let score = CompactionJob::calculate_score(CompactionJobType::TrivialMove, &input, None);
assert_eq!(score, 100.0);
}
#[test]
fn test_write_amplification() {
let input = CompactionInput {
level: 1,
segments: vec![],
key_range: KeyRange::new(vec![], vec![], 0),
total_size: 100 * 1024 * 1024, };
let next_level = CompactionInput {
level: 2,
segments: vec![],
key_range: KeyRange::new(vec![], vec![], 0),
total_size: 400 * 1024 * 1024, };
let output = CompactionOutput::new(2, 64 * 1024 * 1024);
let job = CompactionJob::new(
1,
CompactionJobType::LevelCompaction,
input,
Some(next_level),
output,
vec![100],
);
assert_eq!(job.write_amplification(), 5.0);
}
#[test]
fn test_trivial_move_zero_write_amp() {
let input = CompactionInput {
level: 1,
segments: vec![],
key_range: KeyRange::new(vec![], vec![], 0),
total_size: 64 * 1024 * 1024,
};
let output = CompactionOutput::new(2, 64 * 1024 * 1024);
let job = CompactionJob::new(
1,
CompactionJobType::TrivialMove,
input,
None,
output,
vec![],
);
assert_eq!(job.write_amplification(), 0.0);
}
}