use std::collections::HashMap;
use crate::types::level::ParquetFileMeta;
use crate::iceberg::deletion_vector::DeletionVector;
#[derive(Clone, Debug)]
pub struct IcebergDataFile {
pub path: String,
pub file_size: u64,
pub num_rows: u64,
pub meta: ParquetFileMeta,
}
#[derive(Clone, Debug)]
pub struct SnapshotTransaction {
pub adds: Vec<IcebergDataFile>,
pub removes: Vec<String>,
pub dvs: HashMap<String, DeletionVector>,
pub props: HashMap<String, String>,
}
impl SnapshotTransaction {
pub fn new() -> Self {
Self {
adds: Vec::new(),
removes: Vec::new(),
dvs: HashMap::new(),
props: HashMap::new(),
}
}
pub fn add_file(&mut self, file: IcebergDataFile) {
self.adds.push(file);
}
pub fn remove_file(&mut self, path: String) {
self.removes.push(path);
}
pub fn add_dv(&mut self, path: String, dv: DeletionVector) {
self.dvs
.entry(path)
.and_modify(|existing| existing.union_with(&dv))
.or_insert(dv);
}
pub fn set_prop(&mut self, key: impl Into<String>, value: impl Into<String>) {
self.props.insert(key.into(), value.into());
}
pub fn is_empty(&self) -> bool {
self.adds.is_empty() && self.removes.is_empty() && self.dvs.is_empty()
}
pub fn num_adds(&self) -> usize {
self.adds.len()
}
pub fn num_removes(&self) -> usize {
self.removes.len()
}
}
impl Default for SnapshotTransaction {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::level::Level;
fn test_data_file(path: &str, level: u8) -> IcebergDataFile {
IcebergDataFile {
path: path.to_string(),
file_size: 1024,
num_rows: 100,
meta: ParquetFileMeta {
level: Level(level),
seq_min: 1,
seq_max: 10,
key_min: vec![0x01],
key_max: vec![0xFF],
num_rows: 100,
file_size: 1024,
dv_path: None,
dv_offset: None,
dv_length: None,
format: None,
column_stats: None,
},
}
}
#[test]
fn empty_txn() {
let txn = SnapshotTransaction::new();
assert!(txn.is_empty());
assert_eq!(txn.num_adds(), 0);
assert_eq!(txn.num_removes(), 0);
}
#[test]
fn add_and_remove() {
let mut txn = SnapshotTransaction::new();
txn.add_file(test_data_file("data/L0/a.parquet", 0));
txn.add_file(test_data_file("data/L1/b.parquet", 1));
txn.remove_file("data/L0/old.parquet".into());
txn.set_prop("merutable.job", "compaction");
assert!(!txn.is_empty());
assert_eq!(txn.num_adds(), 2);
assert_eq!(txn.num_removes(), 1);
assert_eq!(txn.props.get("merutable.job").unwrap(), "compaction");
}
#[test]
fn dv_merge_in_txn() {
let mut txn = SnapshotTransaction::new();
let mut dv1 = DeletionVector::new();
dv1.mark_deleted(0);
dv1.mark_deleted(5);
txn.add_dv("data/L0/a.parquet".into(), dv1);
let mut dv2 = DeletionVector::new();
dv2.mark_deleted(5);
dv2.mark_deleted(10);
txn.add_dv("data/L0/a.parquet".into(), dv2);
let merged = txn.dvs.get("data/L0/a.parquet").unwrap();
assert_eq!(merged.cardinality(), 3); }
}