use std::collections::HashMap;
use iceberg_rust_spec::spec::{
manifest::DataFile, materialized_view_metadata::SourceTable, schema::Schema,
snapshot::SnapshotReference,
};
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
use self::operation::Operation;
use super::delete_files;
pub(crate) mod operation;
pub(crate) static APPEND_KEY: &str = "append";
pub(crate) static REWRITE_KEY: &str = "rewrite";
pub(crate) static ADD_SCHEMA_KEY: &str = "add-schema";
pub(crate) static SET_DEFAULT_SPEC_KEY: &str = "set-default-spec";
pub(crate) static UPDATE_PROPERTIES_KEY: &str = "update-properties";
pub(crate) static SET_SNAPSHOT_REF_KEY: &str = "set-ref";
pub struct TableTransaction<'table> {
table: &'table mut Table,
operations: HashMap<String, Operation>,
branch: Option<String>,
}
impl<'table> TableTransaction<'table> {
pub fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
TableTransaction {
table,
operations: HashMap::new(),
branch: branch.map(ToString::to_string),
}
}
pub fn add_schema(mut self, schema: Schema) -> Self {
self.operations
.insert(ADD_SCHEMA_KEY.to_owned(), Operation::AddSchema(schema));
self
}
pub fn set_default_spec(mut self, spec_id: i32) -> Self {
self.operations.insert(
SET_DEFAULT_SPEC_KEY.to_owned(),
Operation::SetDefaultSpec(spec_id),
);
self
}
pub fn append(mut self, files: Vec<DataFile>) -> Self {
self.operations
.entry(APPEND_KEY.to_owned())
.and_modify(|mut x| {
if let Operation::NewAppend {
branch: _,
files: old,
lineage: None,
} = &mut x
{
old.extend_from_slice(&files)
}
})
.or_insert(Operation::NewAppend {
branch: self.branch.clone(),
files,
lineage: None,
});
self
}
pub fn rewrite(mut self, files: Vec<DataFile>) -> Self {
self.operations
.entry(REWRITE_KEY.to_owned())
.and_modify(|mut x| {
if let Operation::Rewrite {
branch: _,
files: old,
lineage: None,
} = &mut x
{
old.extend_from_slice(&files)
}
})
.or_insert(Operation::Rewrite {
branch: self.branch.clone(),
files,
lineage: None,
});
self
}
pub fn rewrite_with_lineage(mut self, files: Vec<DataFile>, lineage: Vec<SourceTable>) -> Self {
self.operations
.entry(REWRITE_KEY.to_owned())
.and_modify(|mut x| {
if let Operation::Rewrite {
branch: _,
files: old,
lineage: old_lineage,
} = &mut x
{
old.extend_from_slice(&files);
*old_lineage = Some(lineage.clone());
}
})
.or_insert(Operation::Rewrite {
branch: self.branch.clone(),
files,
lineage: Some(lineage),
});
self
}
pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
self.operations
.entry(UPDATE_PROPERTIES_KEY.to_owned())
.and_modify(|mut x| {
if let Operation::UpdateProperties(props) = &mut x {
props.extend_from_slice(&entries)
}
})
.or_insert(Operation::UpdateProperties(entries));
self
}
pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
self.operations.insert(
SET_SNAPSHOT_REF_KEY.to_owned(),
Operation::SetSnapshotRef(entry),
);
self
}
pub async fn commit(self) -> Result<(), Error> {
let catalog = self.table.catalog();
let object_store = self.table.object_store();
let identifier = self.table.identifier.clone();
let delete_data = if self.operations.values().any(|x| {
matches!(
x,
Operation::Rewrite {
branch: _,
files: _,
lineage: _,
}
)
}) {
Some(self.table.metadata())
} else {
None
};
let (mut requirements, mut updates) = (Vec::new(), Vec::new());
for operation in self.operations.into_values() {
let (requirement, update) = operation
.execute(self.table.metadata(), self.table.object_store())
.await?;
if let Some(requirement) = requirement {
requirements.push(requirement);
}
updates.extend(update);
}
let new_table = catalog
.clone()
.update_table(CommitTable {
identifier,
requirements,
updates,
})
.await?;
if let Some(old_metadata) = delete_data {
delete_files(old_metadata, object_store).await?;
}
*self.table = new_table;
Ok(())
}
}