use std::collections::HashSet;
use crate::iceberg::version::Version;
use crate::types::level::Level;
use crate::engine::config::EngineConfig;
#[derive(Debug, Clone)]
pub struct CompactionPick {
pub input_level: Level,
pub output_level: Level,
pub score: f64,
pub input_files: Vec<String>,
}
pub fn pick_compaction(
version: &Version,
config: &EngineConfig,
busy_levels: &HashSet<Level>,
) -> Option<CompactionPick> {
let level_is_free =
|lvl: Level| -> bool { !busy_levels.contains(&lvl) && !busy_levels.contains(&lvl.next()) };
let l0_count = version.l0_file_count();
let l0_score = l0_count as f64 / config.l0_compaction_trigger as f64;
let force_l0 = l0_count >= config.l0_slowdown_trigger && level_is_free(Level(0));
let best_level;
let best_score;
if force_l0 {
best_level = Level(0);
best_score = l0_score;
} else {
let mut cur_best_score = 0.0f64;
let mut cur_best_level = Level(0);
let mut found_candidate = false;
if l0_score > cur_best_score && level_is_free(Level(0)) {
cur_best_score = l0_score;
cur_best_level = Level(0);
found_candidate = true;
}
for (i, &target) in config.level_target_bytes.iter().enumerate() {
let level = Level((i + 1) as u8);
if !level_is_free(level) {
continue;
}
let bytes = version.level_bytes(level);
if target > 0 {
let score = bytes as f64 / target as f64;
if score > cur_best_score {
cur_best_score = score;
cur_best_level = level;
found_candidate = true;
}
}
}
if !found_candidate || cur_best_score < 1.0 {
return None; }
best_level = cur_best_level;
best_score = cur_best_score;
}
let output_level = best_level.next();
let max_bytes = config.max_compaction_bytes;
let max_rows = config.max_compaction_input_rows;
let would_exceed_row_cap = |total: u64, f_rows: u64| -> bool {
max_rows > 0 && total.saturating_add(f_rows) > max_rows
};
let input_files: Vec<String> = if best_level == Level(0) {
let mut l0: Vec<&crate::iceberg::version::DataFileMeta> =
version.files_at(Level(0)).iter().collect();
l0.sort_by_key(|f| f.meta.seq_min);
let mut total_bytes = 0u64;
let mut total_rows = 0u64;
let mut picked = Vec::new();
for f in l0 {
if !picked.is_empty()
&& (total_bytes.saturating_add(f.meta.file_size) > max_bytes
|| would_exceed_row_cap(total_rows, f.meta.num_rows))
{
break;
}
total_bytes = total_bytes.saturating_add(f.meta.file_size);
total_rows = total_rows.saturating_add(f.meta.num_rows);
picked.push(f.path.clone());
}
picked
} else {
let files = version.files_at(best_level);
let mut total_bytes = 0u64;
let mut total_rows = 0u64;
let mut picked = Vec::new();
for f in files.iter() {
if !picked.is_empty()
&& (total_bytes.saturating_add(f.meta.file_size) > max_bytes
|| would_exceed_row_cap(total_rows, f.meta.num_rows))
{
break;
}
total_bytes = total_bytes.saturating_add(f.meta.file_size);
total_rows = total_rows.saturating_add(f.meta.num_rows);
picked.push(f.path.clone());
}
picked
};
if input_files.is_empty() {
return None;
}
Some(CompactionPick {
input_level: best_level,
output_level,
score: best_score,
input_files,
})
}
#[inline]
pub fn should_drop_tombstones(output_level: Level, num_configured_levels: usize) -> bool {
output_level.0 as usize > num_configured_levels
}
#[cfg(test)]
mod tests {
use super::*;
use crate::iceberg::version::DataFileMeta;
use crate::types::{
level::{Level, ParquetFileMeta},
schema::{ColumnDef, ColumnType, TableSchema},
};
use std::sync::Arc;
fn test_schema() -> Arc<TableSchema> {
Arc::new(TableSchema {
table_name: "test".into(),
columns: vec![ColumnDef {
name: "id".into(),
col_type: ColumnType::Int64,
nullable: false,
..Default::default()
}],
primary_key: vec![0],
..Default::default()
})
}
fn make_file(path: &str, level: u8, num_rows: u64, file_size: u64) -> DataFileMeta {
DataFileMeta {
path: path.to_string(),
meta: ParquetFileMeta {
level: Level(level),
seq_min: 1,
seq_max: 10,
key_min: vec![0x01],
key_max: vec![0xFF],
num_rows,
file_size,
dv_path: None,
dv_offset: None,
dv_length: None,
format: None,
column_stats: None,
},
dv_path: None,
dv_offset: None,
dv_length: None,
}
}
fn empty_busy() -> HashSet<Level> {
HashSet::new()
}
#[test]
fn no_compaction_needed() {
let v = Version::empty(test_schema());
let config = EngineConfig::default();
assert!(pick_compaction(&v, &config, &empty_busy()).is_none());
}
#[test]
fn l0_compaction_trigger() {
let mut v = Version::empty(test_schema());
let mut l0_files = Vec::new();
for i in 0..5 {
l0_files.push(make_file(&format!("l0_{i}.parquet"), 0, 100, 1024));
}
v.levels.insert(Level(0), l0_files);
let config = EngineConfig::default();
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(pick.input_level, Level(0));
assert_eq!(pick.output_level, Level(1));
assert_eq!(pick.input_files.len(), 5);
}
#[test]
fn l0_priority_above_slowdown() {
let mut v = Version::empty(test_schema());
let config = EngineConfig::default();
let mut l0_files = Vec::new();
for i in 0..25 {
l0_files.push(make_file(&format!("l0_{i}.parquet"), 0, 100, 1024));
}
v.levels.insert(Level(0), l0_files);
let mut l2_files = Vec::new();
for i in 0..10 {
l2_files.push(make_file(
&format!("l2_{i}.parquet"),
2,
1_000_000,
1024 * 1024 * 1024, ));
}
v.levels.insert(Level(2), l2_files);
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(
pick.input_level,
Level(0),
"L0 above slowdown must be prioritized over deeper-level pressure"
);
}
#[test]
fn deeper_level_wins_when_l0_below_slowdown() {
let mut v = Version::empty(test_schema());
let config = EngineConfig::default();
let mut l0_files = Vec::new();
for i in 0..5 {
l0_files.push(make_file(&format!("l0_{i}.parquet"), 0, 100, 1024));
}
v.levels.insert(Level(0), l0_files);
let mut l2_files = Vec::new();
for i in 0..10 {
l2_files.push(make_file(
&format!("l2_{i}.parquet"),
2,
1_000_000,
1024 * 1024 * 1024,
));
}
v.levels.insert(Level(2), l2_files);
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(
pick.input_level,
Level(2),
"below slowdown, deeper level with higher score should win"
);
}
#[test]
fn picks_disjoint_level_when_l0_busy() {
let mut v = Version::empty(test_schema());
let config = EngineConfig::default();
for i in 0..5 {
v.levels.entry(Level(0)).or_default().push(make_file(
&format!("l0_{i}.parquet"),
0,
100,
1024,
));
}
for i in 0..10 {
v.levels.entry(Level(2)).or_default().push(make_file(
&format!("l2_{i}.parquet"),
2,
1_000_000,
1024 * 1024 * 1024,
));
}
let mut busy = HashSet::new();
busy.insert(Level(0));
busy.insert(Level(1));
let pick = pick_compaction(&v, &config, &busy).unwrap();
assert_eq!(
pick.input_level,
Level(2),
"L0 busy → worker B must be able to pick L2 (disjoint from {{L0,L1}})"
);
assert_eq!(pick.output_level, Level(3));
}
#[test]
fn returns_none_when_all_candidates_busy() {
let mut v = Version::empty(test_schema());
let config = EngineConfig::default();
for i in 0..5 {
v.levels.entry(Level(0)).or_default().push(make_file(
&format!("l0_{i}.parquet"),
0,
100,
1024,
));
}
let mut busy = HashSet::new();
busy.insert(Level(0));
busy.insert(Level(1));
assert!(
pick_compaction(&v, &config, &busy).is_none(),
"no other level needs compaction → must return None when L0 is busy"
);
}
#[test]
fn l0_pick_caps_at_max_compaction_bytes() {
let mut v = Version::empty(test_schema());
let config = EngineConfig {
max_compaction_bytes: 256 * 1024 * 1024, ..EngineConfig::default()
};
for i in 0..20 {
let mut f = make_file(&format!("l0_{i}.parquet"), 0, 100, 64 * 1024 * 1024);
f.meta.seq_min = (i + 1) as u64;
f.meta.seq_max = (i + 1) as u64 * 10;
v.levels.entry(Level(0)).or_default().push(f);
}
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(pick.input_level, Level(0));
assert!(
pick.input_files.len() <= 5,
"L0 pick must respect max_compaction_bytes; got {} files \
(cap=256MiB, 64MiB each)",
pick.input_files.len()
);
assert!(
!pick.input_files.is_empty(),
"L0 pick must always take at least one file to make progress"
);
}
#[test]
fn pick_respects_max_compaction_input_rows() {
let mut v = Version::empty(test_schema());
let config = EngineConfig {
max_compaction_bytes: 8 * 1024 * 1024 * 1024, max_compaction_input_rows: 10_000,
..EngineConfig::default()
};
for i in 0..20 {
let mut f = make_file(&format!("l0_{i}.parquet"), 0, 4096, 1024 * 1024);
f.meta.seq_min = (i + 1) as u64;
f.meta.seq_max = (i + 1) as u64 * 10;
v.levels.entry(Level(0)).or_default().push(f);
}
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
let total_rows: u64 = pick
.input_files
.iter()
.map(|path| {
v.files_at(Level(0))
.iter()
.find(|f| f.path == *path)
.map(|f| f.meta.num_rows)
.unwrap_or(0)
})
.sum();
assert_eq!(pick.input_files.len(), 2, "row cap bounds L0 pick");
assert!(
total_rows <= 10_000 || pick.input_files.len() == 1,
"row total stays at/under cap (or progress-guaranteed single pick)"
);
}
#[test]
fn row_cap_of_zero_disables_enforcement() {
let mut v = Version::empty(test_schema());
let config = EngineConfig {
max_compaction_bytes: 8 * 1024 * 1024 * 1024,
max_compaction_input_rows: 0, ..EngineConfig::default()
};
for i in 0..10 {
let mut f = make_file(&format!("l0_{i}.parquet"), 0, 4096, 1024 * 1024);
f.meta.seq_min = (i + 1) as u64;
f.meta.seq_max = (i + 1) as u64 * 10;
v.levels.entry(Level(0)).or_default().push(f);
}
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(
pick.input_files.len(),
10,
"row cap disabled → all L0 files picked (byte cap well above)"
);
}
#[test]
fn l1plus_pick_caps_at_max_compaction_bytes() {
let mut v = Version::empty(test_schema());
let config = EngineConfig {
max_compaction_bytes: 256 * 1024 * 1024, ..EngineConfig::default()
};
for i in 0..20 {
let mut f = make_file(&format!("l2_{i}.parquet"), 2, 100_000, 512 * 1024 * 1024);
f.meta.key_min = vec![i as u8];
f.meta.key_max = vec![i as u8, 0xFF];
v.levels.entry(Level(2)).or_default().push(f);
}
v.levels
.get_mut(&Level(2))
.unwrap()
.sort_by(|a, b| a.meta.key_min.cmp(&b.meta.key_min));
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(pick.input_level, Level(2));
assert_eq!(
pick.input_files.len(),
1,
"L2 pick must respect max_compaction_bytes; got {} files \
(cap=256MiB, 512MiB each, should pick exactly 1)",
pick.input_files.len()
);
}
#[test]
fn l1plus_pick_is_contiguous_prefix() {
let mut v = Version::empty(test_schema());
let config = EngineConfig {
max_compaction_bytes: 300 * 1024 * 1024, ..EngineConfig::default()
};
for i in 0..30 {
let mut f = make_file(&format!("l2_{i:02}.parquet"), 2, 100_000, 100 * 1024 * 1024);
f.meta.key_min = vec![i as u8];
f.meta.key_max = vec![i as u8, 0xFF];
v.levels.entry(Level(2)).or_default().push(f);
}
v.levels
.get_mut(&Level(2))
.unwrap()
.sort_by(|a, b| a.meta.key_min.cmp(&b.meta.key_min));
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(pick.input_level, Level(2));
assert_eq!(pick.input_files.len(), 3, "expected 3 files under cap");
assert_eq!(pick.input_files[0], "l2_00.parquet");
assert_eq!(pick.input_files[1], "l2_01.parquet");
assert_eq!(pick.input_files[2], "l2_02.parquet");
}
#[test]
fn l0_pick_always_picks_at_least_one() {
let mut v = Version::empty(test_schema());
let config = EngineConfig {
max_compaction_bytes: 1024, ..EngineConfig::default()
};
for i in 0..5 {
let mut f = make_file(&format!("l0_{i}.parquet"), 0, 100, 64 * 1024 * 1024);
f.meta.seq_min = (i + 1) as u64;
v.levels.entry(Level(0)).or_default().push(f);
}
let pick = pick_compaction(&v, &config, &empty_busy()).unwrap();
assert_eq!(pick.input_files.len(), 1, "must take at least one file");
}
#[test]
fn l0_force_pick_respects_busy_l0() {
let mut v = Version::empty(test_schema());
let config = EngineConfig::default();
for i in 0..30 {
v.levels.entry(Level(0)).or_default().push(make_file(
&format!("l0_{i}.parquet"),
0,
100,
1024,
));
}
for i in 0..10 {
v.levels.entry(Level(2)).or_default().push(make_file(
&format!("l2_{i}.parquet"),
2,
1_000_000,
1024 * 1024 * 1024,
));
}
let mut busy = HashSet::new();
busy.insert(Level(0));
busy.insert(Level(1));
let pick = pick_compaction(&v, &config, &busy).unwrap();
assert_eq!(
pick.input_level,
Level(2),
"L0 busy → force-pick must defer to normal scoring and pick L2"
);
}
#[test]
fn tombstone_drop_only_at_true_bottom_level() {
let n = 4;
for output in 1..=n {
assert!(
!should_drop_tombstones(Level(output as u8), n),
"output L{output} must NOT drop tombstones (L{n} still has a target)"
);
}
assert!(
should_drop_tombstones(Level((n + 1) as u8), n),
"output L{} must drop tombstones (true bottom level)",
n + 1
);
assert!(!should_drop_tombstones(Level(0), 0));
assert!(should_drop_tombstones(Level(1), 0));
assert!(!should_drop_tombstones(Level(1), 1));
assert!(should_drop_tombstones(Level(2), 1));
}
}