use std::collections::HashMap;
use iceberg_rust_spec::spec::{
manifest::DataFile, materialized_view_metadata::SourceTable, types::StructType,
view_metadata::ViewRepresentation,
};
use crate::{
catalog::commit::{CommitTable, CommitView},
error::Error,
table::{
delete_files,
transaction::{operation::Operation as TableOperation, REWRITE_KEY},
},
view::transaction::operation::Operation as ViewOperation,
};
use super::MaterializedView;
pub struct Transaction<'view> {
materialized_view: &'view mut MaterializedView,
view_operations: Vec<ViewOperation>,
storage_table_operations: HashMap<String, TableOperation>,
branch: Option<String>,
}
impl<'view> Transaction<'view> {
pub fn new(view: &'view mut MaterializedView, branch: Option<&str>) -> Self {
Transaction {
materialized_view: view,
view_operations: vec![],
storage_table_operations: HashMap::new(),
branch: branch.map(ToString::to_string),
}
}
pub fn update_representation(
mut self,
representation: ViewRepresentation,
schema: StructType,
) -> Self {
self.view_operations
.push(ViewOperation::UpdateRepresentation {
representation,
schema,
branch: self.branch.clone(),
});
self
}
pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
self.view_operations
.push(ViewOperation::UpdateProperties(entries));
self
}
pub fn full_refresh(mut self, files: Vec<DataFile>, lineage: Vec<SourceTable>) -> Self {
self.storage_table_operations
.entry(REWRITE_KEY.to_owned())
.and_modify(|mut x| {
if let TableOperation::Rewrite {
branch: _,
files: old,
lineage: old_lineage,
} = &mut x
{
old.extend_from_slice(&files);
*old_lineage = Some(lineage.clone());
}
})
.or_insert(TableOperation::Rewrite {
branch: self.branch.clone(),
files,
lineage: Some(lineage),
});
self
}
pub async fn commit(self) -> Result<(), Error> {
let catalog = self.materialized_view.catalog();
let identifier = self.materialized_view.identifier().clone();
let delete_data = if !self.storage_table_operations.is_empty() {
let (mut table_requirements, mut table_updates) = (Vec::new(), Vec::new());
let storage_table = self.materialized_view.storage_table().await?;
let delete_data = if self.storage_table_operations.values().any(|x| match x {
TableOperation::Rewrite {
branch: _,
files: _,
lineage: _,
} => true,
_ => false,
}) {
Some(storage_table.metadata().clone())
} else {
None
};
for operation in self.storage_table_operations.into_values() {
let (requirement, update) = operation
.execute(
&storage_table.metadata(),
self.materialized_view.object_store(),
)
.await?;
if let Some(requirement) = requirement {
table_requirements.push(requirement);
}
table_updates.extend(update);
}
storage_table
.catalog()
.update_table(CommitTable {
identifier: storage_table.identifier().clone(),
requirements: table_requirements,
updates: table_updates,
})
.await?;
delete_data
} else {
None
};
let (mut view_requirements, mut view_updates) = (Vec::new(), Vec::new());
for operation in self.view_operations {
let (requirement, update) = operation.execute(&self.materialized_view.metadata).await?;
if let Some(requirement) = requirement {
view_requirements.push(requirement);
}
view_updates.extend(update);
}
let new_matview = catalog
.clone()
.update_materialized_view(CommitView {
identifier,
requirements: view_requirements,
updates: view_updates,
})
.await?;
if let Some(old_metadata) = delete_data {
delete_files(&old_metadata, self.materialized_view.object_store()).await?;
}
*self.materialized_view = new_matview;
Ok(())
}
}