#[derive(Debug, Clone)]
pub struct RunInfo {
pub level: u32,
pub run_id: u64,
pub size_bytes: u64,
pub max_delta_count: u32,
}
#[derive(Debug, Clone)]
pub struct GraphStats {
pub hot_node_ids: Vec<u64>,
pub max_pending_deltas: u32,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompactionDecision {
pub source_level: u32,
pub target_level: u32,
pub run_ids: Vec<u64>,
pub reason: String,
}
pub struct CompactionPolicy {
pub max_deltas_per_node: u32,
pub max_l0_runs: usize,
}
impl CompactionPolicy {
pub fn new(max_deltas_per_node: u32, max_l0_runs: usize) -> Self {
Self {
max_deltas_per_node,
max_l0_runs,
}
}
pub fn select_compaction(
&self,
runs: &[RunInfo],
stats: &GraphStats,
) -> Option<CompactionDecision> {
if stats.max_pending_deltas >= self.max_deltas_per_node {
let l0_runs: Vec<u64> = runs
.iter()
.filter(|r| r.level == 0)
.map(|r| r.run_id)
.collect();
if !l0_runs.is_empty() {
return Some(CompactionDecision {
source_level: 0,
target_level: 1,
run_ids: l0_runs,
reason: "max_pending_deltas exceeded".to_string(),
});
}
}
let l0_runs: Vec<u64> = runs
.iter()
.filter(|r| r.level == 0)
.map(|r| r.run_id)
.collect();
if l0_runs.len() > self.max_l0_runs {
return Some(CompactionDecision {
source_level: 0,
target_level: 1,
run_ids: l0_runs,
reason: "l0 run count exceeded".to_string(),
});
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn compaction_triggers_on_pending_deltas() {
let policy = CompactionPolicy::new(32, 4);
let runs = vec![
RunInfo {
level: 0,
run_id: 1,
size_bytes: 1024,
max_delta_count: 10,
},
RunInfo {
level: 0,
run_id: 2,
size_bytes: 2048,
max_delta_count: 50,
},
];
let stats = GraphStats {
hot_node_ids: vec![7],
max_pending_deltas: 40,
};
let decision = policy.select_compaction(&runs, &stats).unwrap();
assert_eq!(decision.source_level, 0);
assert_eq!(decision.target_level, 1);
assert!(decision.run_ids.contains(&1));
assert!(decision.run_ids.contains(&2));
}
#[test]
fn compaction_triggers_on_l0_run_count() {
let policy = CompactionPolicy::new(100, 2);
let runs = vec![
RunInfo {
level: 0,
run_id: 1,
size_bytes: 100,
max_delta_count: 1,
},
RunInfo {
level: 0,
run_id: 2,
size_bytes: 100,
max_delta_count: 1,
},
RunInfo {
level: 0,
run_id: 3,
size_bytes: 100,
max_delta_count: 1,
},
];
let stats = GraphStats {
hot_node_ids: vec![],
max_pending_deltas: 0,
};
let decision = policy.select_compaction(&runs, &stats).unwrap();
assert_eq!(decision.run_ids.len(), 3);
}
}