use std::{path::Path, sync::Arc};
use crate::iceberg::{version::DataFileMeta, DeletionVector, IcebergDataFile, SnapshotTransaction};
use crate::parquet::reader::ParquetReader;
use crate::types::{
level::Level,
level::ParquetFileMeta,
schema::TableSchema,
value::{FieldValue, Row},
MeruError, Result,
};
use bytes::Bytes;
use roaring::RoaringBitmap;
use tracing::{debug, info, instrument, warn};
use crate::engine::{
compaction::{
iterator::{CompactionIterator, FileEntries},
picker,
},
engine::MeruEngine,
};
fn open_source_file(
base: &Path,
file: &DataFileMeta,
schema: Arc<TableSchema>,
) -> Result<(ParquetReader<Bytes>, Option<RoaringBitmap>)> {
let abs_parquet = base.join(&file.path);
let parquet_bytes = std::fs::read(&abs_parquet).map_err(MeruError::Io)?;
let reader = ParquetReader::open(Bytes::from(parquet_bytes), schema)?;
let dv = match (&file.dv_path, file.dv_offset, file.dv_length) {
(Some(dv_path), Some(offset), Some(length)) => {
let abs_dv = base.join(dv_path);
let puffin_bytes = std::fs::read(&abs_dv).map_err(MeruError::Io)?;
if offset < 0 || length < 0 {
return Err(MeruError::Corruption(format!(
"DV has negative offset ({offset}) or length ({length}) on file {}",
file.path,
)));
}
let start = offset as usize;
let end = start
.checked_add(length as usize)
.ok_or_else(|| MeruError::Corruption("DV offset+length overflow".into()))?;
if end > puffin_bytes.len() {
return Err(MeruError::Corruption(format!(
"DV blob out of range: path={dv_path} offset={offset} length={length} puffin_len={}",
puffin_bytes.len()
)));
}
let dv = DeletionVector::from_puffin_blob(&puffin_bytes[start..end])?;
Some(dv.bitmap().clone())
}
(None, None, None) => None,
_ => {
return Err(MeruError::Corruption(format!(
"inconsistent DV coords on file {}: dv_path={:?} dv_offset={:?} dv_length={:?}",
file.path, file.dv_path, file.dv_offset, file.dv_length
)));
}
};
Ok((reader, dv))
}
fn estimate_row_bytes(row: &Row) -> u64 {
let mut total: u64 = 0;
for fv in row.fields.iter().flatten() {
total += match fv {
FieldValue::Boolean(_) => 1,
FieldValue::Int32(_) | FieldValue::Float(_) => 4,
FieldValue::Int64(_) | FieldValue::Double(_) => 8,
FieldValue::Bytes(b) => b.len() as u64 + 8,
};
}
total
}
const TARGET_OUTPUT_FILE_BYTES: u64 = 512 * 1024 * 1024;
struct OutputChunk {
rows: Vec<(crate::types::key::InternalKey, Row)>,
seq_min: u64,
seq_max: u64,
key_min: Vec<u8>,
key_max: Vec<u8>,
approx_bytes: u64,
last_user_key: Vec<u8>,
}
impl OutputChunk {
fn empty() -> Self {
Self {
rows: Vec::new(),
seq_min: u64::MAX,
seq_max: 0,
key_min: Vec::new(),
key_max: Vec::new(),
approx_bytes: 0,
last_user_key: Vec::new(),
}
}
fn push(&mut self, ikey: crate::types::key::InternalKey, row: Row, est_bytes: u64) {
let uk = ikey.user_key_bytes().to_vec();
let s = ikey.seq.0;
if s < self.seq_min {
self.seq_min = s;
}
if s > self.seq_max {
self.seq_max = s;
}
if self.key_min.is_empty() || uk.as_slice() < self.key_min.as_slice() {
self.key_min = uk.clone();
}
if uk.as_slice() > self.key_max.as_slice() {
self.key_max.clone_from(&uk);
}
self.approx_bytes = self.approx_bytes.saturating_add(est_bytes);
self.last_user_key = uk;
self.rows.push((ikey, row));
}
}
fn compute_union_range(files: &[DataFileMeta]) -> (Option<Vec<u8>>, Option<Vec<u8>>) {
let mut union_min: Option<Vec<u8>> = None;
let mut union_max: Option<Vec<u8>> = None;
for f in files {
let km = &f.meta.key_min;
let kx = &f.meta.key_max;
match &union_min {
None => union_min = Some(km.clone()),
Some(cur) if km.as_slice() < cur.as_slice() => union_min = Some(km.clone()),
_ => {}
}
match &union_max {
None => union_max = Some(kx.clone()),
Some(cur) if kx.as_slice() > cur.as_slice() => union_max = Some(kx.clone()),
_ => {}
}
}
(union_min, union_max)
}
struct LevelReservation {
engine: Arc<MeruEngine>,
levels: Vec<Level>,
}
impl Drop for LevelReservation {
fn drop(&mut self) {
let levels = std::mem::take(&mut self.levels);
if levels.is_empty() {
return;
}
let acquired = {
let try_result = self.engine.compacting_levels.try_lock();
match try_result {
Ok(mut guard) => {
for l in &levels {
guard.remove(l);
}
self.engine
.compacting_levels_len
.store(guard.len(), std::sync::atomic::Ordering::Relaxed);
true
}
Err(_) => false,
}
};
if !acquired {
let engine = self.engine.clone();
tokio::spawn(async move {
let mut guard = engine.compacting_levels.lock().await;
for l in &levels {
guard.remove(l);
}
engine
.compacting_levels_len
.store(guard.len(), std::sync::atomic::Ordering::Relaxed);
});
}
}
}
#[instrument(skip(engine), fields(op = "compaction"))]
pub async fn run_compaction(engine: &Arc<MeruEngine>) -> Result<()> {
const MAX_ITERATIONS: usize = 128;
for iter in 0..MAX_ITERATIONS {
let did_work = run_one_compaction_job(engine).await?;
if !did_work {
if iter > 0 {
debug!(iterations = iter, "compaction drained all pressure");
}
return Ok(());
}
}
warn!(
max = MAX_ITERATIONS,
"compaction loop hit iteration cap — will resume on next trigger"
);
Ok(())
}
async fn reserve_next_compaction(
engine: &Arc<MeruEngine>,
) -> Option<(
picker::CompactionPick,
Arc<crate::iceberg::version::Version>,
LevelReservation,
)> {
let mut busy = engine.compacting_levels.lock().await;
let version_guard = engine.version_set.current();
let pick = picker::pick_compaction(&version_guard, &engine.config, &busy)?;
let version: Arc<crate::iceberg::version::Version> = (*version_guard).clone();
drop(version_guard);
let input_level = pick.input_level;
let output_level = pick.output_level;
busy.insert(input_level);
busy.insert(output_level);
engine
.compacting_levels_len
.store(busy.len(), std::sync::atomic::Ordering::Relaxed);
drop(busy);
let reservation = LevelReservation {
engine: engine.clone(),
levels: vec![input_level, output_level],
};
Some((pick, version, reservation))
}
async fn run_one_compaction_job(engine: &Arc<MeruEngine>) -> Result<bool> {
let (pick, version, _reservation) = match reserve_next_compaction(engine).await {
Some(r) => r,
None => {
debug!("no compaction needed (or all candidates busy)");
return Ok(false);
}
};
let job_started_at = std::time::Instant::now();
let output_level_str = format!("L{}", pick.output_level.0);
info!(
input_level = pick.input_level.0,
output_level = pick.output_level.0,
score = pick.score,
input_files = pick.input_files.len(),
"starting compaction"
);
let input_selection: std::collections::HashSet<&str> =
pick.input_files.iter().map(|s| s.as_str()).collect();
let input_file_metas: Vec<DataFileMeta> = version
.files_at(pick.input_level)
.iter()
.filter(|f| input_selection.contains(f.path.as_str()))
.cloned()
.collect();
if input_file_metas.is_empty() {
debug!("compaction picked an empty input level");
return Ok(false);
}
let (union_min, union_max) = compute_union_range(&input_file_metas);
let overlap_output_metas: Vec<DataFileMeta> = if let (Some(umin), Some(umax)) =
(union_min.as_ref(), union_max.as_ref())
{
version
.files_at(pick.output_level)
.iter()
.filter(|f| {
(f.meta.key_min.is_empty() || f.meta.key_min.as_slice() <= umax.as_slice())
&& (f.meta.key_max.is_empty() || f.meta.key_max.as_slice() >= umin.as_slice())
})
.cloned()
.collect()
} else {
Vec::new()
};
if !overlap_output_metas.is_empty() {
info!(
output_level = pick.output_level.0,
overlap_count = overlap_output_metas.len(),
"pulling overlapping output-level files into compaction to preserve non-overlap invariant"
);
}
let base = engine.catalog.base_path();
let all_source_metas: Vec<&DataFileMeta> = input_file_metas
.iter()
.chain(overlap_output_metas.iter())
.collect();
let mut file_entries: Vec<FileEntries> = Vec::with_capacity(all_source_metas.len());
for (file_idx, file_meta) in all_source_metas.iter().enumerate() {
let (reader, dv) = open_source_file(base, file_meta, engine.schema.clone())?;
let physical = reader.read_physical_rows_with_positions(dv.as_ref())?;
file_entries.push(FileEntries {
file_idx,
entries: physical,
});
}
let read_seq = engine.read_seq();
let drop_tombstones =
picker::should_drop_tombstones(pick.output_level, engine.config.level_target_bytes.len());
let iter = CompactionIterator::new(file_entries, read_seq, drop_tombstones);
if iter.is_empty() {
debug!(
input_level = pick.input_level.0,
"compaction produced no output rows"
);
}
let mut chunks: Vec<OutputChunk> = Vec::new();
let mut current = OutputChunk::empty();
for entry in iter {
let est = estimate_row_bytes(&entry.row) + entry.ikey.user_key_bytes().len() as u64 + 16;
let uk = entry.ikey.user_key_bytes();
if !current.rows.is_empty()
&& current.approx_bytes.saturating_add(est) > TARGET_OUTPUT_FILE_BYTES
&& current.last_user_key.as_slice() != uk
{
chunks.push(std::mem::replace(&mut current, OutputChunk::empty()));
}
current.push(entry.ikey, entry.row, est);
}
if !current.rows.is_empty() {
chunks.push(current);
}
let total_output_rows: u64 = chunks.iter().map(|c| c.rows.len() as u64).sum();
let mut txn = SnapshotTransaction::new();
if !chunks.is_empty() {
engine.catalog.ensure_level_dir(pick.output_level).await?;
}
for chunk in chunks {
if chunk.rows.is_empty() {
continue;
}
let chunk_rows = chunk.rows.len() as u64;
let seq_min = if chunk.seq_min == u64::MAX {
0
} else {
chunk.seq_min
};
let seq_max = chunk.seq_max;
let key_min = chunk.key_min;
let key_max = chunk.key_max;
let file_id = uuid::Uuid::new_v4().to_string();
let output_path = format!("data/L{}/{file_id}.parquet", pick.output_level.0);
let format = engine.config.file_format_for(pick.output_level);
let (parquet_bytes, _, writer_meta) = crate::parquet::writer::write_sorted_rows(
chunk.rows,
engine.schema.clone(),
pick.output_level,
format,
engine.config.bloom_bits_per_key,
)?;
if parquet_bytes.is_empty() {
return Err(MeruError::Parquet(
"writer returned empty bytes for non-empty row set".into(),
));
}
let file_size = parquet_bytes.len() as u64;
let full_path = engine.catalog.data_file_path(pick.output_level, &file_id);
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(MeruError::Io)?;
}
tokio::fs::write(&full_path, &parquet_bytes)
.await
.map_err(MeruError::Io)?;
tokio::fs::File::open(&full_path)
.await
.map_err(MeruError::Io)?
.sync_all()
.await
.map_err(MeruError::Io)?;
if let Some(parent) = full_path.parent() {
if let Ok(dir) = tokio::fs::File::open(parent).await {
let _ = dir.sync_all().await;
}
}
let meta = ParquetFileMeta {
level: pick.output_level,
seq_min,
seq_max,
key_min,
key_max,
num_rows: chunk_rows,
file_size,
dv_path: None,
dv_offset: None,
dv_length: None,
format: Some(format),
column_stats: writer_meta.column_stats,
};
txn.add_file(IcebergDataFile {
path: output_path,
file_size,
num_rows: chunk_rows,
meta,
});
}
for file_meta in &input_file_metas {
txn.remove_file(file_meta.path.clone());
}
for file_meta in &overlap_output_metas {
txn.remove_file(file_meta.path.clone());
}
txn.set_prop("merutable.job", "compaction");
txn.set_prop("merutable.input_level", pick.input_level.0.to_string());
txn.set_prop("merutable.output_level", pick.output_level.0.to_string());
let new_version = {
let _commit_guard = engine.commit_lock.lock().await;
let commit_started = std::time::Instant::now();
let v = engine.catalog.commit(&txn, engine.schema.clone()).await?;
crate::engine::metrics::record(
crate::engine::metrics::COMMIT_DURATION_SECONDS,
commit_started.elapsed().as_secs_f64(),
);
v
};
engine.version_set.install(new_version);
engine.l0_drained.notify_waiters();
crate::engine::metrics::inc_labeled(
crate::engine::metrics::COMPACTIONS_TOTAL,
"input_level",
pick.input_level.0.to_string(),
);
crate::engine::metrics::inc(crate::engine::metrics::SNAPSHOTS_COMMITTED_TOTAL);
if !overlap_output_metas.is_empty() {
crate::engine::metrics::inc_by(
crate::engine::metrics::OVERLAP_PULLINS_TOTAL,
overlap_output_metas.len() as u64,
);
}
let output_bytes_total: u64 = txn.adds.iter().map(|f| f.file_size).sum();
crate::engine::metrics::record_labeled(
crate::engine::metrics::COMPACTION_DURATION_SECONDS,
"output_level",
output_level_str,
job_started_at.elapsed().as_secs_f64(),
);
crate::engine::metrics::record(
crate::engine::metrics::COMPACTION_OUTPUT_BYTES,
output_bytes_total as f64,
);
if let Some(ref cache) = engine.row_cache {
cache.clear();
}
let mut obsoleted_paths: Vec<std::path::PathBuf> = Vec::new();
for file_meta in input_file_metas.iter().chain(overlap_output_metas.iter()) {
obsoleted_paths.push(base.join(&file_meta.path));
if let Some(ref dv) = file_meta.dv_path {
obsoleted_paths.push(base.join(dv));
}
}
engine
.enqueue_for_deletion(obsoleted_paths, version.snapshot_id)
.await;
engine.gc_pending_deletions().await;
info!(
input_level = pick.input_level.0,
output_level = pick.output_level.0,
output_rows = total_output_rows,
"compaction committed"
);
Ok(true)
}