use std::collections::HashMap;
use iceberg_rust_spec::{
materialized_view_metadata::{RefreshState, REFRESH_STATE},
spec::{manifest::DataFile, types::StructType, view_metadata::ViewRepresentation},
};
use crate::{
catalog::commit::{CommitTable, CommitView},
error::Error,
table::{
delete_all_table_files,
transaction::{operation::Operation as TableOperation, APPEND_INDEX, REPLACE_INDEX},
},
view::transaction::operation::Operation as ViewOperation,
};
use super::MaterializedView;
pub struct Transaction<'view> {
materialized_view: &'view mut MaterializedView,
view_operations: Vec<ViewOperation>,
storage_table_operations: Vec<Option<TableOperation>>,
branch: Option<String>,
}
impl<'view> Transaction<'view> {
pub fn new(view: &'view mut MaterializedView, branch: Option<&str>) -> Self {
Transaction {
materialized_view: view,
view_operations: vec![],
storage_table_operations: (0..6).map(|_| None).collect(), branch: branch.map(ToString::to_string),
}
}
pub fn update_representations(
mut self,
representations: Vec<ViewRepresentation>,
schema: StructType,
) -> Self {
self.view_operations
.push(ViewOperation::UpdateRepresentations {
representations,
schema,
branch: self.branch.clone(),
});
self
}
pub fn update_properties(mut self, entries: Vec<(String, String)>) -> Self {
self.view_operations
.push(ViewOperation::UpdateProperties(entries));
self
}
pub fn full_refresh(
mut self,
files: Vec<DataFile>,
refresh_state: RefreshState,
) -> Result<Self, Error> {
let refresh_state = serde_json::to_string(&refresh_state)?;
if let Some(ref mut operation) = self.storage_table_operations[REPLACE_INDEX] {
if let TableOperation::Replace {
branch: _,
files: old,
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
)]));
}
} else {
self.storage_table_operations[REPLACE_INDEX] = Some(TableOperation::Replace {
branch: self.branch.clone(),
files,
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
)])),
});
}
Ok(self)
}
pub fn append(
mut self,
files: Vec<DataFile>,
refresh_state: RefreshState,
) -> Result<Self, Error> {
let refresh_state = serde_json::to_string(&refresh_state)?;
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
if let TableOperation::Append {
branch: _,
data_files: old,
delete_files: _,
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
)]));
}
} else {
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
branch: self.branch.clone(),
data_files: files,
delete_files: Vec::new(),
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
)])),
});
}
Ok(self)
}
pub fn delete(
mut self,
files: Vec<DataFile>,
refresh_state: RefreshState,
) -> Result<Self, Error> {
let refresh_state = serde_json::to_string(&refresh_state)?;
if let Some(ref mut operation) = self.storage_table_operations[APPEND_INDEX] {
if let TableOperation::Append {
branch: _,
data_files: _,
delete_files: old,
additional_summary: old_lineage,
} = operation
{
old.extend_from_slice(&files);
*old_lineage = Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state.clone(),
)]));
}
} else {
self.storage_table_operations[APPEND_INDEX] = Some(TableOperation::Append {
branch: self.branch.clone(),
data_files: Vec::new(),
delete_files: files,
additional_summary: Some(HashMap::from_iter(vec![(
REFRESH_STATE.to_owned(),
refresh_state,
)])),
});
}
Ok(self)
}
pub async fn commit(self) -> Result<(), Error> {
let catalog = self.materialized_view.catalog();
let identifier = self.materialized_view.identifier().clone();
let storage_table = self.materialized_view.storage_table().await?;
let delete_data = if !self.storage_table_operations.is_empty() {
let (mut table_requirements, mut table_updates) = (Vec::new(), Vec::new());
let delete_data = if self
.storage_table_operations
.iter()
.flatten()
.any(|x| matches!(x, TableOperation::Replace { .. }))
{
Some(storage_table.metadata().clone())
} else {
None
};
for operation in self.storage_table_operations.into_iter().flatten() {
let (requirement, update) = operation
.execute(storage_table.metadata(), storage_table.object_store())
.await?;
if let Some(requirement) = requirement {
table_requirements.push(requirement);
}
table_updates.extend(update);
}
storage_table
.catalog()
.update_table(CommitTable {
identifier: storage_table.identifier().clone(),
requirements: table_requirements,
updates: table_updates,
})
.await?;
delete_data
} else {
None
};
let (mut view_requirements, mut view_updates) = (Vec::new(), Vec::new());
for operation in self.view_operations {
let (requirement, update) = operation.execute(&self.materialized_view.metadata).await?;
if let Some(requirement) = requirement {
view_requirements.push(requirement);
}
view_updates.extend(update);
}
let new_matview = catalog
.clone()
.update_materialized_view(CommitView {
identifier,
requirements: view_requirements,
updates: view_updates,
})
.await?;
if let Some(old_metadata) = delete_data {
delete_all_table_files(&old_metadata, storage_table.object_store()).await?;
}
*self.materialized_view = new_matview;
Ok(())
}
}