use iceberg::table::Table;
use iceberg::{Catalog, Error, ErrorKind, Result, TableCommit};
use redb::ReadableTable;
use crate::catalog::RedbCatalog;
use crate::error::map_redb;
use crate::keys::table_key;
use crate::store::TABLES;
impl RedbCatalog {
pub async fn atomic_release(
&self,
commits: impl IntoIterator<Item = TableCommit>,
) -> Result<Vec<Table>> {
struct Staged {
ident: iceberg::TableIdent,
base_metadata_location: String,
staged_metadata_location: String,
staged_table: Table,
}
let mut staged: Vec<Staged> = Vec::new();
for commit in commits {
let ident = commit.identifier().clone();
let current = self.load_table(&ident).await?;
let base = current.metadata_location_result()?.to_string();
let table = commit.apply(current)?;
let new_loc = table.metadata_location_result()?.to_string();
staged.push(Staged {
ident,
base_metadata_location: base,
staged_metadata_location: new_loc,
staged_table: table,
});
}
for s in &staged {
s.staged_table
.metadata()
.write_to(s.staged_table.file_io(), &s.staged_metadata_location)
.await?;
}
{
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
for s in &staged {
let key = table_key(&self.name, &s.ident);
let current = tables_tbl
.get(key.as_str())
.map_err(map_redb)?
.map(|v| v.value().to_string());
match current {
Some(loc) if loc == s.base_metadata_location => {}
Some(_) => {
return Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!("Commit conflicted for table {} in atomic batch", s.ident),
)
.with_retryable(true));
}
None => {
return Err(Error::new(
ErrorKind::TableNotFound,
format!("Table {} disappeared during atomic batch", s.ident),
));
}
}
}
for s in &staged {
let key = table_key(&self.name, &s.ident);
tables_tbl
.insert(key.as_str(), s.staged_metadata_location.as_str())
.map_err(map_redb)?;
}
}
for s in &staged {
crate::store::record_commit(
&write,
&table_key(&self.name, &s.ident),
s.staged_table.metadata().current_snapshot_id(),
&s.staged_metadata_location,
)
.map_err(map_redb)?;
}
write.commit().map_err(map_redb)?;
for s in &staged {
self.store.pointers.insert(
&table_key(&self.name, &s.ident),
&s.staged_metadata_location,
);
}
}
self.store.maybe_trigger_compaction();
Ok(staged.into_iter().map(|s| s.staged_table).collect())
}
}