use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use anyhow::{anyhow, Context, Result};
use arrow::array::{Array, RecordBatch, StringArray};
use arrow::compute::concat_batches;
use iceberg::spec::{
DataContentType, DataFile, DataFileFormat, ManifestFile, ManifestListWriter,
ManifestWriterBuilder, Operation, Snapshot, SnapshotReference, SnapshotRetention, Struct,
Summary,
};
use iceberg::table::Table as IceTable;
use iceberg::writer::base_writer::data_file_writer::DataFileWriterBuilder;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
use iceberg::writer::file_writer::rolling_writer::RollingFileWriterBuilder;
use iceberg::writer::file_writer::ParquetWriterBuilder;
use iceberg::writer::{IcebergWriter, IcebergWriterBuilder};
use iceberg::{Catalog, TableIdent, TableRequirement, TableUpdate};
use skade::parquet::file::properties::WriterProperties;
use skade_katalog::RedbCatalog;
use super::iceberg::{
warehouse_compression, warehouse_sort_cols, IcebergWarehouse,
};
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct CompactReport {
pub table: String,
pub files_before: usize,
pub files_after: usize,
pub rows: u64,
pub partitions: usize,
pub skipped: bool,
}
#[derive(Debug, Clone, Default, serde::Serialize)]
pub struct ExpireReport {
pub table: String,
pub snapshots_before: usize,
pub snapshots_after: usize,
pub files_deleted: usize,
pub bytes_reclaimed: u64,
}
pub fn compact_all(
wh: &IcebergWarehouse,
only_table: Option<&str>,
only_repo: Option<&str>,
) -> Result<Vec<CompactReport>> {
let tables: Vec<String> = match only_table {
Some(t) => vec![t.to_string()],
None => wh
.table_names()?
.into_iter()
.filter(|t| !warehouse_sort_cols(t).is_empty())
.collect(),
};
let mut out = Vec::with_capacity(tables.len());
for t in tables {
out.push(compact_table(wh, &t, only_repo)?);
}
Ok(out)
}
pub fn compact_table(
wh: &IcebergWarehouse,
table: &str,
only_repo: Option<&str>,
) -> Result<CompactReport> {
let catalog = wh.catalog().clone();
let ident = wh.table_ident(table);
let sort_cols = warehouse_sort_cols(table);
wh.block_on(async move {
compact_table_async(catalog.as_ref(), &ident, table, sort_cols, only_repo).await
})
}
async fn compact_table_async(
catalog: &RedbCatalog,
ident: &TableIdent,
table: &str,
sort_cols: &[&str],
only_repo: Option<&str>,
) -> Result<CompactReport> {
let mut report = CompactReport { table: table.to_string(), skipped: true, ..Default::default() };
if sort_cols.is_empty() {
return Ok(report);
}
let tbl = catalog.load_table(ident).await?;
let metadata = tbl.metadata();
let Some(current) = metadata.current_snapshot() else {
return Ok(report); };
let file_io = tbl.file_io().clone();
let manifest_list = current.load_manifest_list(&file_io, metadata).await?;
let mut data_manifests: Vec<ManifestFile> = Vec::new();
let mut nondata_manifests: Vec<ManifestFile> = Vec::new();
let mut live_data_files: Vec<DataFile> = Vec::new();
for mf in manifest_list.entries() {
let manifest = mf.load_manifest(&file_io).await?;
let mut has_data = false;
for entry in manifest.entries() {
if entry.content_type() == DataContentType::Data {
has_data = true;
live_data_files.push(entry.data_file().clone());
}
}
if has_data {
data_manifests.push(mf.clone());
} else {
nondata_manifests.push(mf.clone());
}
}
report.files_before = live_data_files.len();
let part_col = partition_column(metadata);
let batches = skade::read_all(&tbl).await?;
let total_rows: u64 = batches.iter().map(|b| b.num_rows() as u64).sum();
report.rows = total_rows;
if live_data_files.len() <= 1 || total_rows == 0 {
return Ok(report);
}
let groups = group_by_partition(&batches, part_col.as_deref(), only_repo)?;
if groups.is_empty() {
return Ok(report);
}
let mut new_files: Vec<DataFile> = Vec::new();
let mut compacted_partitions: HashSet<Option<String>> = HashSet::new();
for (part_value, batch) in &groups {
let sorted = super::iceberg::sort_batch_by_pub(batch, sort_cols)?;
let files = write_one_file(&tbl, &sorted, part_value.as_deref()).await?;
new_files.extend(files);
compacted_partitions.insert(part_value.clone());
report.partitions += 1;
}
let mut carried_old: Vec<DataFile> = Vec::new();
if only_repo.is_some() {
for df in &live_data_files {
let pv = data_file_partition_value(df, part_col.as_deref());
if !compacted_partitions.contains(&pv) {
carried_old.push(df.clone());
}
}
}
if new_files.is_empty() {
return Ok(report);
}
report.files_after = new_files.len() + carried_old.len();
report.skipped = false;
commit_replace_snapshot(
catalog,
&tbl,
new_files,
carried_old,
nondata_manifests,
)
.await?;
Ok(report)
}
async fn write_one_file(
tbl: &IceTable,
batch: &RecordBatch,
part_value: Option<&str>,
) -> Result<Vec<DataFile>> {
let schema = tbl.metadata().current_schema().clone();
let data_location = format!("{}/data", tbl.metadata().location());
let location_gen = DefaultLocationGenerator::with_data_location(data_location);
let file_name_gen =
DefaultFileNameGenerator::new(unique_prefix(), Some("compact".into()), DataFileFormat::Parquet);
let props = WriterProperties::builder()
.set_compression(warehouse_compression())
.build();
let pw = ParquetWriterBuilder::new(props, schema.clone());
let rolling = RollingFileWriterBuilder::new_with_default_file_size(
pw,
tbl.file_io().clone(),
location_gen,
file_name_gen,
);
let partition = match part_value {
Some(v) => Some(partition_key_for_value(tbl, v)?),
None => None,
};
let mut writer = DataFileWriterBuilder::new(rolling).build(partition).await?;
writer.write(batch.clone()).await?;
let files = writer.close().await?;
Ok(files)
}
async fn commit_replace_snapshot(
catalog: &RedbCatalog,
tbl: &IceTable,
new_files: Vec<DataFile>,
carried_old: Vec<DataFile>,
nondata_manifests: Vec<ManifestFile>,
) -> Result<()> {
let metadata = tbl.metadata();
let schema = metadata.current_schema().clone();
let parent = metadata.current_snapshot();
let parent_id = parent.map(|s| s.snapshot_id());
let new_seq = metadata.last_sequence_number() + 1;
let snapshot_id = fresh_snapshot_id(parent_id);
let location = metadata.location().to_string();
let file_io = tbl.file_io().clone();
let manifest_path = format!("{location}/metadata/compact-m-{snapshot_id}.avro");
let mut mw = ManifestWriterBuilder::new(
file_io.new_output(&manifest_path)?,
Some(snapshot_id),
None,
schema.clone(),
metadata.default_partition_spec().as_ref().clone(),
)
.build_v2_data();
let mut added_rows: u64 = 0;
for df in new_files.into_iter().chain(carried_old.into_iter()) {
added_rows += df.record_count();
mw.add_file(df, new_seq)?;
}
let data_manifest: ManifestFile = mw.write_manifest_file().await?;
let list_path = format!("{location}/metadata/snap-{snapshot_id}.avro");
let mut lw =
ManifestListWriter::v2(file_io.new_output(&list_path)?, snapshot_id, parent_id, new_seq);
lw.add_manifests(std::iter::once(data_manifest))?;
lw.add_manifests(nondata_manifests.into_iter())?;
lw.close().await?;
let snapshot = Snapshot::builder()
.with_snapshot_id(snapshot_id)
.with_parent_snapshot_id(parent_id)
.with_sequence_number(new_seq)
.with_timestamp_ms(now_ms())
.with_manifest_list(list_path)
.with_row_range(metadata.next_row_id(), added_rows)
.with_schema_id(schema.schema_id())
.with_summary(Summary {
operation: Operation::Replace,
additional_properties: HashMap::from([(
"nornir.compacted".to_string(),
"true".to_string(),
)]),
})
.build();
let requirements = vec![TableRequirement::RefSnapshotIdMatch {
r#ref: "main".into(),
snapshot_id: parent_id,
}];
let updates = vec![
TableUpdate::AddSnapshot { snapshot },
TableUpdate::SetSnapshotRef {
ref_name: "main".into(),
reference: SnapshotReference {
snapshot_id,
retention: SnapshotRetention::Branch {
min_snapshots_to_keep: None,
max_snapshot_age_ms: None,
max_ref_age_ms: None,
},
},
},
];
catalog
.commit_table(tbl.identifier().clone(), requirements, updates)
.await
.context("commit compaction replace snapshot")?;
Ok(())
}
pub const DEFAULT_KEEP_LAST: usize = 10;
pub fn expire_all(
wh: &IcebergWarehouse,
only_table: Option<&str>,
keep_last: usize,
keep_days: Option<u64>,
) -> Result<Vec<ExpireReport>> {
let tables: Vec<String> = match only_table {
Some(t) => vec![t.to_string()],
None => wh.table_names()?,
};
let mut out = Vec::with_capacity(tables.len());
for t in tables {
out.push(expire_snapshots(wh, &t, keep_last, keep_days)?);
}
Ok(out)
}
pub fn expire_snapshots(
wh: &IcebergWarehouse,
table: &str,
keep_last: usize,
keep_days: Option<u64>,
) -> Result<ExpireReport> {
let catalog = wh.catalog().clone();
let ident = wh.table_ident(table);
let root = wh.root().to_path_buf();
let table = table.to_string();
wh.block_on(async move {
expire_snapshots_async(catalog.as_ref(), &ident, &table, &root, keep_last, keep_days).await
})
}
async fn expire_snapshots_async(
catalog: &RedbCatalog,
ident: &TableIdent,
table: &str,
root: &Path,
keep_last: usize,
keep_days: Option<u64>,
) -> Result<ExpireReport> {
let mut report = ExpireReport { table: table.to_string(), ..Default::default() };
let tbl = catalog.load_table(ident).await?;
let metadata = tbl.metadata();
let file_io = tbl.file_io().clone();
let mut snaps: Vec<&Snapshot> = metadata.snapshots().map(|s| s.as_ref()).collect();
snaps.sort_by_key(|s| std::cmp::Reverse(s.timestamp_ms()));
report.snapshots_before = snaps.len();
if snaps.is_empty() {
return Ok(report);
}
let keep_last = keep_last.max(1); let cutoff_ms = keep_days.map(|d| now_ms() - (d as i64) * 86_400_000);
let current_id = metadata.current_snapshot_id();
let mut retained_ids: HashSet<i64> = HashSet::new();
for (i, s) in snaps.iter().enumerate() {
let within_n = i < keep_last;
let within_days = cutoff_ms.map(|c| s.timestamp_ms() >= c).unwrap_or(false);
let is_current = Some(s.snapshot_id()) == current_id;
if within_n || within_days || is_current {
retained_ids.insert(s.snapshot_id());
}
}
let remove_ids: Vec<i64> = snaps
.iter()
.map(|s| s.snapshot_id())
.filter(|id| !retained_ids.contains(id))
.collect();
report.snapshots_after = retained_ids.len();
if remove_ids.is_empty() {
return Ok(report); }
let mut live: HashSet<String> = HashSet::new();
for s in snaps.iter().filter(|s| retained_ids.contains(&s.snapshot_id())) {
live.insert(canon(s.manifest_list()));
let ml = s.load_manifest_list(&file_io, metadata).await?;
for mf in ml.entries() {
live.insert(canon(&mf.manifest_path));
let manifest = mf.load_manifest(&file_io).await?;
for entry in manifest.entries() {
live.insert(canon(entry.data_file().file_path()));
}
}
}
catalog
.commit_table(
ident.clone(),
Vec::new(),
vec![TableUpdate::RemoveSnapshots { snapshot_ids: remove_ids.clone() }],
)
.await
.context("commit snapshot removal")?;
let (deleted, bytes) = gc_orphans(root, table, &live)?;
report.files_deleted = deleted;
report.bytes_reclaimed = bytes;
Ok(report)
}
fn gc_orphans(root: &Path, table: &str, live: &HashSet<String>) -> Result<(usize, u64)> {
let root = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
let table_dir = root.join("warehouse").join("nornir").join(table);
let mut deleted = 0usize;
let mut bytes = 0u64;
let mut candidates: Vec<PathBuf> = Vec::new();
collect_files(&table_dir.join("data"), &mut candidates, &["parquet"]);
collect_files(&table_dir.join("metadata"), &mut candidates, &["avro"]);
for path in candidates {
let key = canon(&path.to_string_lossy());
if live.contains(&key) {
continue; }
let size = std::fs::metadata(&path).map(|m| m.len()).unwrap_or(0);
std::fs::remove_file(&path)
.with_context(|| format!("delete orphan {}", path.display()))?;
deleted += 1;
bytes += size;
}
Ok((deleted, bytes))
}
fn partition_column(metadata: &iceberg::spec::TableMetadata) -> Option<String> {
let spec = metadata.default_partition_spec();
let f = spec.fields().first()?;
let schema = metadata.current_schema();
schema.field_by_id(f.source_id).map(|nf| nf.name.clone())
}
fn group_by_partition(
batches: &[RecordBatch],
part_col: Option<&str>,
only_repo: Option<&str>,
) -> Result<Vec<(Option<String>, RecordBatch)>> {
if batches.is_empty() {
return Ok(Vec::new());
}
let schema = batches[0].schema();
let Some(col) = part_col else {
let merged = concat_batches(&schema, batches)?;
if merged.num_rows() == 0 {
return Ok(Vec::new());
}
return Ok(vec![(None, merged)]);
};
use arrow::compute::take;
let mut order: Vec<String> = Vec::new();
let mut buckets: HashMap<String, Vec<RecordBatch>> = HashMap::new();
for b in batches {
let arr = b
.column_by_name(col)
.ok_or_else(|| anyhow!("partition column `{col}` missing from batch"))?;
let sa = arr
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| anyhow!("partition column `{col}` is not Utf8"))?;
let mut per_value: HashMap<String, Vec<u32>> = HashMap::new();
for i in 0..b.num_rows() {
let v = if sa.is_null(i) { String::new() } else { sa.value(i).to_string() };
if let Some(want) = only_repo {
if v != want {
continue;
}
}
per_value.entry(v).or_default().push(i as u32);
}
for (v, idxs) in per_value {
let idx = arrow::array::UInt32Array::from(idxs);
let cols: Vec<_> = b
.columns()
.iter()
.map(|c| take(c.as_ref(), &idx, None))
.collect::<std::result::Result<_, _>>()?;
let part_batch = RecordBatch::try_new(b.schema(), cols)?;
if !buckets.contains_key(&v) {
order.push(v.clone());
}
buckets.entry(v).or_default().push(part_batch);
}
}
let mut out = Vec::with_capacity(order.len());
for v in order {
let parts = buckets.remove(&v).unwrap_or_default();
if parts.is_empty() {
continue;
}
let merged = concat_batches(&schema, &parts)?;
if merged.num_rows() > 0 {
out.push((Some(v), merged));
}
}
Ok(out)
}
fn data_file_partition_value(df: &DataFile, part_col: Option<&str>) -> Option<String> {
part_col?;
let p: &Struct = df.partition();
p.iter().next().flatten().map(|lit| literal_to_string(lit))
}
fn literal_to_string(lit: &iceberg::spec::Literal) -> String {
use iceberg::spec::{Literal, PrimitiveLiteral};
match lit {
Literal::Primitive(PrimitiveLiteral::String(s)) => s.clone(),
other => format!("{other:?}"),
}
}
fn partition_key_for_value(
tbl: &IceTable,
value: &str,
) -> Result<iceberg::spec::PartitionKey> {
use iceberg::spec::{Literal, PartitionKey, Struct};
let spec = tbl.metadata().default_partition_spec().as_ref().clone();
let schema = tbl.metadata().current_schema().clone();
let lit = Some(Literal::string(value));
Ok(PartitionKey::new(spec, schema, Struct::from_iter(std::iter::once(lit))))
}
fn fresh_snapshot_id(parent: Option<i64>) -> i64 {
let ns = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_nanos() as i64)
.unwrap_or(1)
.max(1);
if Some(ns) == parent { ns + 1 } else { ns }
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0)
}
fn unique_prefix() -> String {
format!("{}", uuid::Uuid::new_v4().simple())
}
fn canon(p: &str) -> String {
let stripped = p.strip_prefix("file://").unwrap_or(p);
match std::fs::canonicalize(stripped) {
Ok(c) => c.to_string_lossy().to_string(),
Err(_) => stripped.to_string(),
}
}
fn collect_files(dir: &Path, out: &mut Vec<PathBuf>, exts: &[&str]) {
let Ok(rd) = std::fs::read_dir(dir) else { return };
for entry in rd.flatten() {
let p = entry.path();
if p.is_dir() {
collect_files(&p, out, exts);
} else if let Some(ext) = p.extension().and_then(|e| e.to_str()) {
if exts.contains(&ext) {
out.push(p);
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::knowledge::symbols::{SymbolRow, SymbolScan};
use crate::warehouse::iceberg::{IcebergWarehouse, TABLE_SYMBOL_FACTS};
use crate::warehouse::Warehouse;
fn mk_scan(repo: &str, names: &[&str]) -> SymbolScan {
SymbolScan {
snapshot_id: uuid::Uuid::new_v4(),
ts: chrono::Utc::now(),
repo: repo.into(),
symbols: names
.iter()
.enumerate()
.map(|(i, name)| SymbolRow {
crate_name: repo.into(),
module_path: format!("{repo}::m"),
item_kind: "fn".into(),
item_name: (*name).into(),
visibility: "pub".into(),
file: format!("src/{}.rs", if i % 2 == 0 { "zeta" } else { "alpha" }),
line: i as u32,
doc_lines: 0,
signature: Some(format!("fn {name}()")),
})
.collect(),
..Default::default()
}
}
fn count_data_files_on_disk(root: &Path, table: &str) -> usize {
let root = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
let dir = root.join("warehouse").join("nornir").join(table).join("data");
let mut v = Vec::new();
collect_files(&dir, &mut v, &["parquet"]);
v.len()
}
fn count_avro_files_on_disk(root: &Path, table: &str) -> usize {
let root = root.canonicalize().unwrap_or_else(|_| root.to_path_buf());
let dir = root.join("warehouse").join("nornir").join(table).join("metadata");
let mut v = Vec::new();
collect_files(&dir, &mut v, &["avro"]);
v.len()
}
fn retained_file_set(wh: &IcebergWarehouse, table: &str, keep: usize) -> HashSet<String> {
let ident = wh.table_ident(table);
wh.block_on(async {
let tbl = wh.catalog().load_table(&ident).await.unwrap();
let md = tbl.metadata();
let fio = tbl.file_io().clone();
let mut snaps: Vec<_> = md.snapshots().map(|s| s.as_ref()).collect();
snaps.sort_by_key(|s| std::cmp::Reverse(s.timestamp_ms()));
let mut out = HashSet::new();
for s in snaps.iter().take(keep) {
out.insert(canon(s.manifest_list()));
let ml = s.load_manifest_list(&fio, md).await.unwrap();
for mf in ml.entries() {
out.insert(canon(&mf.manifest_path));
let m = mf.load_manifest(&fio).await.unwrap();
for e in m.entries() {
out.insert(canon(e.data_file().file_path()));
}
}
}
out
})
}
fn read_rows(wh: &IcebergWarehouse, table: &str) -> Vec<(String, String, String)> {
let batches = wh.scan_arrow(table).unwrap();
let mut rows = Vec::new();
for b in &batches {
let repo = b.column_by_name("repo").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
let file = b.column_by_name("file").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
let name = b.column_by_name("item_name").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..b.num_rows() {
rows.push((repo.value(i).to_string(), file.value(i).to_string(), name.value(i).to_string()));
}
}
rows
}
fn head_data_files(wh: &IcebergWarehouse, table: &str) -> Vec<String> {
let ident = wh.table_ident(table);
wh.block_on(async {
let tbl = wh.catalog().load_table(&ident).await.unwrap();
let md = tbl.metadata();
let cur = md.current_snapshot().unwrap();
let fio = tbl.file_io().clone();
let ml = cur.load_manifest_list(&fio, md).await.unwrap();
let mut out = Vec::new();
for mf in ml.entries() {
let m = mf.load_manifest(&fio).await.unwrap();
for e in m.entries() {
if e.content_type() == DataContentType::Data {
out.push(e.data_file().file_path().to_string());
}
}
}
out
})
}
fn snapshot_count(wh: &IcebergWarehouse, table: &str) -> usize {
let ident = wh.table_ident(table);
wh.block_on(async {
wh.catalog().load_table(&ident).await.unwrap().metadata().snapshots().len()
})
}
#[test]
fn compact_merges_small_files_preserving_rows_sortedness_and_tightens_bounds() {
use skade::parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
use skade::parquet::file::reader::FileReader;
use skade::parquet::file::serialized_reader::SerializedFileReader;
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_path_buf();
let wh = IcebergWarehouse::open(&root).unwrap();
wh.append_symbol_scan(&mk_scan("facett", &["mango", "apple", "zulu"])).unwrap();
wh.append_symbol_scan(&mk_scan("facett", &["delta", "banana"])).unwrap();
wh.append_symbol_scan(&mk_scan("nordisk", &["sierra", "alpha"])).unwrap();
wh.append_symbol_scan(&mk_scan("nordisk", &["tango", "bravo", "kilo"])).unwrap();
wh.append_symbol_scan(&mk_scan("facett", &["echo"])).unwrap();
let before_head = head_data_files(&wh, TABLE_SYMBOL_FACTS).len();
let before_rows = read_rows(&wh, TABLE_SYMBOL_FACTS);
assert!(before_head >= 5, "many small head data files before compaction (got {before_head})");
assert!(!before_rows.is_empty(), "RAGNARÖK: fixture has rows");
let reports = compact_all(&wh, Some(TABLE_SYMBOL_FACTS), None).unwrap();
let rep = &reports[0];
assert!(!rep.skipped, "compaction ran (not skipped)");
let after_head = head_data_files(&wh, TABLE_SYMBOL_FACTS);
assert!(
after_head.len() < before_head,
"head data file count dropped: {before_head} -> {}",
after_head.len(),
);
assert!(after_head.len() <= 2, "compacted to ≤2 files (one per repo partition), got {}", after_head.len());
assert_eq!(rep.files_before, before_head);
assert_eq!(rep.files_after, after_head.len());
let after_rows = read_rows(&wh, TABLE_SYMBOL_FACTS);
assert_eq!(after_rows.len(), before_rows.len(), "row count conserved");
let mut b = before_rows.clone();
let mut a = after_rows.clone();
b.sort();
a.sort();
assert_eq!(a, b, "content identical after compaction (multiset equal)");
assert_eq!(rep.rows, before_rows.len() as u64);
let mut file_repo_ranges: Vec<(String, String)> = Vec::new();
for path in &after_head {
let p = path.strip_prefix("file://").unwrap_or(path);
let mut prev: Option<(String, String)> = None;
let f = std::fs::File::open(p).unwrap();
let mut rb = ParquetRecordBatchReaderBuilder::try_new(f).unwrap().build().unwrap();
while let Some(batch) = rb.next() {
let batch = batch.unwrap();
let file_col = batch.column_by_name("file").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
let name_col = batch.column_by_name("item_name").unwrap().as_any().downcast_ref::<StringArray>().unwrap();
for i in 0..batch.num_rows() {
let key = (file_col.value(i).to_string(), name_col.value(i).to_string());
if let Some(pv) = &prev {
assert!(pv <= &key, "compacted file rows sorted by (file,item_name): {pv:?} !<= {key:?}");
}
prev = Some(key);
}
}
let f2 = std::fs::File::open(p).unwrap();
let reader = SerializedFileReader::new(f2).unwrap();
let meta = reader.metadata();
let (mut lo, mut hi): (Option<String>, Option<String>) = (None, None);
for rg in 0..meta.num_row_groups() {
for col in meta.row_group(rg).columns() {
if col.column_path().string() == "repo" {
let s = col.statistics().expect("repo stats present (manifest bounds)");
let min = String::from_utf8_lossy(s.min_bytes_opt().unwrap()).to_string();
let max = String::from_utf8_lossy(s.max_bytes_opt().unwrap()).to_string();
lo = Some(lo.map_or(min.clone(), |x| x.min(min)));
hi = Some(hi.map_or(max.clone(), |x| x.max(max)));
}
}
}
let (lo, hi) = (lo.unwrap(), hi.unwrap());
assert_eq!(lo, hi, "compacted file is a single repo partition ⇒ tight repo bound [{lo},{hi}]");
file_repo_ranges.push((lo, hi));
}
let distinct: HashSet<_> = file_repo_ranges.iter().map(|(lo, _)| lo.clone()).collect();
assert_eq!(distinct.len(), after_head.len(), "each compacted file covers a distinct repo (non-overlapping bounds)");
eprintln!(
"[compact] symbol_facts: {before_head} → {} head data file(s), {} row(s) preserved, sorted, tight per-repo bounds {file_repo_ranges:?}",
after_head.len(),
after_rows.len(),
);
let _ = count_data_files_on_disk; }
#[test]
fn expire_keeps_last_n_and_gcs_orphans_without_touching_retained_files() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_path_buf();
let wh = IcebergWarehouse::open(&root).unwrap();
for batch in [
&["a1", "a2"][..],
&["b1"][..],
&["c1", "c2", "c3"][..],
&["d1"][..],
&["e1", "e2"][..],
] {
wh.append_symbol_scan(&mk_scan("facett", batch)).unwrap();
}
let snaps_before = snapshot_count(&wh, TABLE_SYMBOL_FACTS);
assert!(snaps_before >= 5, "K snapshots created (got {snaps_before})");
let live_before = retained_file_set(&wh, TABLE_SYMBOL_FACTS, 2);
let avro_before = count_avro_files_on_disk(&root, TABLE_SYMBOL_FACTS);
let reports = expire_all(&wh, Some(TABLE_SYMBOL_FACTS), 2, None).unwrap();
let rep = reports.iter().find(|r| r.table == TABLE_SYMBOL_FACTS).unwrap();
let snaps_after = snapshot_count(&wh, TABLE_SYMBOL_FACTS);
assert_eq!(snaps_after, 2, "exactly 2 snapshots retained (was {snaps_before})");
assert_eq!(rep.snapshots_after, 2);
assert!(rep.snapshots_before >= 5);
let rows = read_rows(&wh, TABLE_SYMBOL_FACTS);
assert!(!rows.is_empty(), "current snapshot reads non-empty after expiry");
assert!(rows.iter().any(|(_, _, n)| n == "e1"), "head snapshot's newest rows still readable");
assert!(rows.iter().any(|(_, _, n)| n == "a1"), "head still carries the oldest appended rows");
let avro_after = count_avro_files_on_disk(&root, TABLE_SYMBOL_FACTS);
assert!(rep.files_deleted > 0, "deleted some orphan files");
assert!(
avro_after < avro_before,
"stale manifest/manifest-list avros GC'd: {avro_before} -> {avro_after}",
);
for f in &live_before {
assert!(Path::new(f).exists(), "retained-snapshot file was wrongly deleted: {f}");
}
eprintln!(
"[expire] symbol_facts: {snaps_before} → {snaps_after} snapshot(s); deleted {} orphan avro/file(s) ({} → {} avros on disk), {} byte(s) reclaimed; all {} retained files intact",
rep.files_deleted, avro_before, avro_after, rep.bytes_reclaimed, live_before.len(),
);
}
#[test]
fn compact_then_expire_reclaims_superseded_data_files() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_path_buf();
let wh = IcebergWarehouse::open(&root).unwrap();
for batch in [&["a1", "a2"][..], &["b1"][..], &["c1", "c2"][..], &["d1"][..]] {
wh.append_symbol_scan(&mk_scan("facett", batch)).unwrap();
}
let rows_before = read_rows(&wh, TABLE_SYMBOL_FACTS);
let data_before = count_data_files_on_disk(&root, TABLE_SYMBOL_FACTS);
assert!(data_before >= 4, "many small data files before (got {data_before})");
compact_all(&wh, Some(TABLE_SYMBOL_FACTS), None).unwrap();
let reports = expire_all(&wh, Some(TABLE_SYMBOL_FACTS), 1, None).unwrap();
let rep = reports.iter().find(|r| r.table == TABLE_SYMBOL_FACTS).unwrap();
let live = retained_file_set(&wh, TABLE_SYMBOL_FACTS, 1);
let data_after = count_data_files_on_disk(&root, TABLE_SYMBOL_FACTS);
assert!(
data_after < data_before,
"data parquet files reclaimed: {data_before} -> {data_after}",
);
assert!(rep.files_deleted > 0 && rep.bytes_reclaimed > 0, "disk reclaimed");
let rows_after = read_rows(&wh, TABLE_SYMBOL_FACTS);
let (mut a, mut b) = (rows_after.clone(), rows_before.clone());
a.sort();
b.sort();
assert_eq!(a, b, "row content identical through compact+expire");
for f in &live {
assert!(Path::new(f).exists(), "retained file wrongly deleted: {f}");
}
eprintln!(
"[compact+expire] symbol_facts: data files {data_before} → {data_after} on disk, {} byte(s) reclaimed, {} rows preserved, retained files intact",
rep.bytes_reclaimed, rows_after.len(),
);
}
#[test]
fn expire_keeps_all_when_under_threshold() {
let dir = tempfile::tempdir().unwrap();
let root = dir.path().to_path_buf();
let wh = IcebergWarehouse::open(&root).unwrap();
wh.append_symbol_scan(&mk_scan("facett", &["a"])).unwrap();
wh.append_symbol_scan(&mk_scan("facett", &["b"])).unwrap();
let before = snapshot_count(&wh, TABLE_SYMBOL_FACTS);
let reports = expire_all(&wh, Some(TABLE_SYMBOL_FACTS), 10, None).unwrap();
let rep = reports.iter().find(|r| r.table == TABLE_SYMBOL_FACTS).unwrap();
assert_eq!(rep.files_deleted, 0, "nothing aged out under keep-last=10");
assert_eq!(snapshot_count(&wh, TABLE_SYMBOL_FACTS), before, "all snapshots retained");
}
}