use std::collections::HashMap;
use tracing::{debug, instrument};
use iceberg_rust_spec::spec::{manifest::DataFile, schema::Schema, snapshot::SnapshotReference};
use crate::table::transaction::append::append_summary;
use crate::table::transaction::operation::SequenceGroup;
use crate::{catalog::commit::CommitTable, error::Error, table::Table};
use self::operation::Operation;
pub(crate) mod append;
pub(crate) mod operation;
pub(crate) mod overwrite;
pub(crate) static ADD_SCHEMA_INDEX: usize = 0;
pub(crate) static SET_DEFAULT_SPEC_INDEX: usize = 1;
pub(crate) static APPEND_INDEX: usize = 2;
pub(crate) static APPEND_SEQUENCE_GROUPS_INDEX: usize = 3;
pub(crate) static REPLACE_INDEX: usize = 4;
pub(crate) static OVERWRITE_INDEX: usize = 5;
pub(crate) static UPDATE_PROPERTIES_INDEX: usize = 6;
pub(crate) static SET_SNAPSHOT_REF_INDEX: usize = 7;
pub(crate) static EXPIRE_SNAPSHOTS_INDEX: usize = 8;
pub(crate) static NUM_OPERATIONS: usize = 9;
pub struct TableTransaction<'table> {
table: &'table mut Table,
operations: Vec<Option<Operation>>,
branch: Option<String>,
}
impl<'table> TableTransaction<'table> {
pub(crate) fn new(table: &'table mut Table, branch: Option<&str>) -> Self {
TableTransaction {
table,
operations: (0..NUM_OPERATIONS).map(|_| None).collect(), branch: branch.map(ToString::to_string),
}
}
pub fn add_schema(mut self, schema: Schema) -> Self {
self.operations[ADD_SCHEMA_INDEX] = Some(Operation::AddSchema(schema));
self
}
pub fn set_default_spec(mut self, spec_id: i32) -> Self {
self.operations[SET_DEFAULT_SPEC_INDEX] = Some(Operation::SetDefaultSpec(spec_id));
self
}
pub fn append_data(mut self, files: Vec<DataFile>) -> Self {
if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
panic!("Cannot use append and append_sequence_group in the same transaction");
}
let summary = append_summary(&files);
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
if let Operation::Append {
data_files: old, ..
} = operation
{
old.extend_from_slice(&files);
}
} else {
self.operations[APPEND_INDEX] = Some(Operation::Append {
branch: self.branch.clone(),
data_files: files,
delete_files: Vec::new(),
additional_summary: summary,
});
}
self
}
pub fn append_delete(mut self, files: Vec<DataFile>) -> Self {
if self.operations[APPEND_SEQUENCE_GROUPS_INDEX].is_some() {
panic!("Cannot use append and append_sequence_group in the same transaction");
}
if let Some(ref mut operation) = self.operations[APPEND_INDEX] {
if let Operation::Append {
delete_files: old, ..
} = operation
{
old.extend_from_slice(&files);
}
} else {
self.operations[APPEND_INDEX] = Some(Operation::Append {
branch: self.branch.clone(),
data_files: Vec::new(),
delete_files: files,
additional_summary: None,
});
}
self
}
pub fn append_sequence_group(
mut self,
data_files: Vec<DataFile>,
delete_files: Vec<DataFile>,
) -> Self {
if self.operations[APPEND_INDEX].is_some() {
panic!("Cannot use append and append_sequence_group in the same transaction");
}
if let Some(ref mut operation) = self.operations[APPEND_SEQUENCE_GROUPS_INDEX] {
if let Operation::AppendSequenceGroups {
sequence_groups: old,
..
} = operation
{
old.push(SequenceGroup {
delete_files,
data_files,
});
}
} else {
self.operations[APPEND_SEQUENCE_GROUPS_INDEX] = Some(Operation::AppendSequenceGroups {
branch: self.branch.clone(),
sequence_groups: vec![SequenceGroup {
delete_files,
data_files,
}],
});
}
self
}
pub fn overwrite(
mut self,
files: Vec<DataFile>,
files_to_overwrite: HashMap<String, Vec<String>>,
) -> Self {
let summary = append_summary(&files);
if let Some(ref mut operation) = self.operations[OVERWRITE_INDEX] {
if let Operation::Overwrite {
data_files: old_data_files,
files_to_overwrite: old_files_to_overwrite,
..
} = operation
{
old_data_files.extend_from_slice(&files);
old_files_to_overwrite.extend(files_to_overwrite);
}
} else {
self.operations[OVERWRITE_INDEX] = Some(Operation::Overwrite {
branch: self.branch.clone(),
data_files: files,
files_to_overwrite,
additional_summary: summary,
});
}
self
}
pub fn replace(mut self, files: Vec<DataFile>) -> Self {
if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
if let Operation::Replace {
branch: _,
files: old,
additional_summary: None,
} = operation
{
old.extend_from_slice(&files);
}
} else {
self.operations[REPLACE_INDEX] = Some(Operation::Replace {
branch: self.branch.clone(),
files,
additional_summary: None,
});
}
self
}
pub fn replace_with_lineage(
mut self,
files: Vec<DataFile>,
additional_summary: std::collections::HashMap<String, String>,
) -> Self {
if let Some(ref mut operation) = self.operations[REPLACE_INDEX] {
if let Operation::Replace {
branch: _,
files: old,
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
*old_lineage = Some(additional_summary.clone());
}
} else {
self.operations[REPLACE_INDEX] = Some(Operation::Replace {
branch: self.branch.clone(),
files,
additional_summary: Some(additional_summary),
});
}
self
}
pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
if let Some(ref mut operation) = self.operations[UPDATE_PROPERTIES_INDEX] {
if let Operation::UpdateProperties(props) = operation {
props.extend_from_slice(&entries);
}
} else {
self.operations[UPDATE_PROPERTIES_INDEX] = Some(Operation::UpdateProperties(entries));
}
self
}
pub fn set_snapshot_ref(mut self, entry: (String, SnapshotReference)) -> Self {
self.operations[SET_SNAPSHOT_REF_INDEX] = Some(Operation::SetSnapshotRef(entry));
self
}
pub fn expire_snapshots(
mut self,
older_than: Option<i64>,
retain_last: Option<usize>,
clean_orphan_files: bool,
retain_ref_snapshots: bool,
dry_run: bool,
) -> Self {
self.operations[EXPIRE_SNAPSHOTS_INDEX] = Some(Operation::ExpireSnapshots {
older_than,
retain_last,
_clean_orphan_files: clean_orphan_files,
retain_ref_snapshots,
dry_run,
});
self
}
#[instrument(name = "iceberg_rust::table::transaction::commit", level = "debug", skip(self), fields(
table_identifier = %self.table.identifier,
branch = ?self.branch
))]
pub async fn commit(self) -> Result<(), Error> {
let catalog = self.table.catalog();
let identifier = self.table.identifier.clone();
let (mut requirements, mut updates) = (Vec::new(), Vec::new());
for operation in self.operations.into_iter().flatten() {
let (requirement, update) = operation
.execute(self.table.metadata(), self.table.object_store())
.await?;
if let Some(requirement) = requirement {
requirements.push(requirement);
}
updates.extend(update);
}
if updates.is_empty() {
return Ok(());
}
debug!(
"Committing {} updates to table {}: requirements={:?}, updates={:?}",
updates.len(),
identifier,
requirements,
updates
);
let new_table = catalog
.clone()
.update_table(CommitTable {
identifier,
requirements,
updates,
})
.await?;
*self.table = new_table;
Ok(())
}
}