use std::collections::HashMap;
use std::ops::Bound;
use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use iceberg::io::FileIO;
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
use iceberg::{
Catalog, Error, ErrorKind, MetadataLocation, Namespace, NamespaceIdent, Result, TableCommit,
TableCreation, TableIdent, TableRequirement, TableUpdate,
};
use redb::ReadableTable;
use crate::error::map_redb;
use crate::keys::{
SEP, catalog_prefix, namespace_key, namespace_prop_key, namespace_prop_prefix, ns_path,
prefix_upper, table_key, table_prefix,
};
use crate::store::{NAMESPACE_PROPS, NAMESPACES, Store, TABLES};
const NAMESPACE_LOCATION_PROPERTY_KEY: &str = "location";
#[derive(Debug, Clone)]
pub struct RedbCatalog {
pub(crate) name: String,
pub(crate) warehouse_location: String,
pub(crate) fileio: FileIO,
pub(crate) store: Store,
pub(crate) meta_cache: crate::meta_cache::MetadataCache,
pub(crate) table_cache: crate::table_cache::TableHandleCache,
}
impl RedbCatalog {
pub fn name(&self) -> &str {
&self.name
}
pub fn warehouse_location(&self) -> &str {
&self.warehouse_location
}
pub fn db_path(&self) -> &std::path::Path {
&self.store.path
}
pub fn file_io(&self) -> &FileIO {
&self.fileio
}
pub fn pointer_generation(&self) -> u64 {
self.store.pointers.generation()
}
pub async fn commit_seq(&self) -> Result<u64> {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let meta = read.open_table(crate::store::META).map_err(map_redb)?;
Ok(meta
.get(crate::store::META_COMMIT_SEQ)
.map_err(map_redb)?
.map(|v| v.value())
.unwrap_or(0))
}
pub async fn resolve_metadata(&self, identifier: &TableIdent) -> Result<Arc<TableMetadata>> {
Ok(self.resolve(identifier).await?.1)
}
async fn resolve_location(&self, identifier: &TableIdent) -> Result<String> {
let key = table_key(&self.name, identifier);
match self.store.pointers.get(&key) {
Some(loc) => Ok(loc.to_string()),
None => {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let tables_tbl = read.open_table(TABLES).map_err(map_redb)?;
match tables_tbl.get(key.as_str()).map_err(map_redb)? {
Some(v) => Ok(v.value().to_string()),
None => Err(no_such_table(identifier)),
}
}
}
}
async fn resolve(&self, identifier: &TableIdent) -> Result<(String, Arc<TableMetadata>)> {
let metadata_location = self.resolve_location(identifier).await?;
let metadata = self
.meta_cache
.get_or_load(&self.fileio, &metadata_location)
.await?;
Ok((metadata_location, metadata))
}
pub async fn compact_static_index(&self) -> Result<usize> {
let db = self.store.db.lock().await;
self.store.compact_with_db(&db).map_err(Error::from)
}
async fn resolve_location_at(
&self,
table_key: &str,
snapshot_id: i64,
) -> Result<Option<String>> {
if let Some(idx) = self.store.static_index.load_full() {
if let Some(e) = idx.lookup(snapshot_id) {
if e.table_key.as_ref() == table_key {
return Ok(Some(e.metadata_location.to_string()));
}
}
}
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let commits = read.open_table(crate::store::COMMITS).map_err(map_redb)?;
let k = crate::store::commit_key(table_key, snapshot_id);
Ok(commits
.get(k.as_slice())
.map_err(map_redb)?
.map(|v| v.value().to_string()))
}
pub async fn resolve_metadata_at(
&self,
identifier: &TableIdent,
snapshot_id: i64,
) -> Result<Arc<TableMetadata>> {
let key = table_key(&self.name, identifier);
let loc = self
.resolve_location_at(&key, snapshot_id)
.await?
.ok_or_else(|| snapshot_not_found(identifier, snapshot_id))?;
self.meta_cache.get_or_load(&self.fileio, &loc).await
}
pub async fn load_table_at(&self, identifier: &TableIdent, snapshot_id: i64) -> Result<Table> {
let key = table_key(&self.name, identifier);
let loc = self
.resolve_location_at(&key, snapshot_id)
.await?
.ok_or_else(|| snapshot_not_found(identifier, snapshot_id))?;
if let Some(table) = self.table_cache.get(&loc, identifier).await {
return Ok(table);
}
let metadata = self.meta_cache.get_or_load(&self.fileio, &loc).await?;
self.table_cache
.build_and_insert(&self.fileio, identifier, loc, metadata)
.await
}
pub async fn resolve_many(
&self,
identifier: &TableIdent,
snapshot_ids: &[i64],
) -> Result<Vec<Option<Arc<TableMetadata>>>> {
let key = table_key(&self.name, identifier);
let mut locations: Vec<Option<String>> = vec![None; snapshot_ids.len()];
if let Some(idx) = self.store.static_index.load_full() {
for (slot, hit) in locations.iter_mut().zip(idx.lookup_many(snapshot_ids)) {
if let Some(e) = hit {
if e.table_key.as_ref() == key {
*slot = Some(e.metadata_location.to_string());
}
}
}
}
if locations.iter().any(Option::is_none) {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let commits = read.open_table(crate::store::COMMITS).map_err(map_redb)?;
for (slot, &sid) in locations.iter_mut().zip(snapshot_ids) {
if slot.is_none() {
let k = crate::store::commit_key(&key, sid);
if let Some(v) = commits.get(k.as_slice()).map_err(map_redb)? {
*slot = Some(v.value().to_string());
}
}
}
}
let mut out = Vec::with_capacity(snapshot_ids.len());
for loc in locations {
match loc {
Some(l) => out.push(Some(self.meta_cache.get_or_load(&self.fileio, &l).await?)),
None => out.push(None),
}
}
Ok(out)
}
pub async fn commit_table(
&self,
ident: TableIdent,
requirements: Vec<TableRequirement>,
updates: Vec<TableUpdate>,
) -> Result<Table> {
let current_table = self.load_table(&ident).await?;
let current_metadata_location = current_table.metadata_location_result()?.to_string();
for requirement in &requirements {
requirement.check(Some(current_table.metadata()))?;
}
let mut metadata_builder = current_table
.metadata()
.clone()
.into_builder(Some(current_metadata_location.clone()));
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}
let new_metadata = metadata_builder.build()?.metadata;
let staged_metadata_location = MetadataLocation::from_str(¤t_metadata_location)?
.with_next_version()
.to_string();
let staged_table = Table::builder()
.file_io(current_table.file_io().clone())
.identifier(ident.clone())
.metadata(new_metadata)
.metadata_location(staged_metadata_location)
.build()?;
self.persist_commit(ident, current_metadata_location, staged_table)
.await
}
async fn persist_commit(
&self,
ident: TableIdent,
base_metadata_location: String,
staged_table: Table,
) -> Result<Table> {
let staged_metadata_location = staged_table.metadata_location_result()?.to_string();
staged_table
.metadata()
.write_to(staged_table.file_io(), &staged_metadata_location)
.await?;
let key = table_key(&self.name, &ident);
let snapshot_id = staged_table.metadata().current_snapshot_id();
let conflict_ident = ident.clone();
let staged_loc = staged_metadata_location.clone();
self.store
.group_commit(Box::new(move |write| {
{
let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
let existing = tables_tbl
.get(key.as_str())
.map_err(map_redb)?
.map(|v| v.value().to_string());
match existing {
Some(loc) if loc == base_metadata_location => {
tables_tbl
.insert(key.as_str(), staged_loc.as_str())
.map_err(map_redb)?;
}
Some(_) => {
return Err(Error::new(
ErrorKind::CatalogCommitConflicts,
format!("Commit conflicted for table: {conflict_ident}"),
)
.with_retryable(true));
}
None => return Err(no_such_table(&conflict_ident)),
}
}
crate::store::record_commit(write, &key, snapshot_id, &staged_loc)
.map_err(map_redb)?;
Ok(crate::store::CommitOutcome::insert(key, staged_loc))
}))
.await?;
Ok(staged_table)
}
}
fn no_such_namespace(ns: &NamespaceIdent) -> Error {
Error::new(
ErrorKind::NamespaceNotFound,
format!("Namespace does not exist: {ns}"),
)
}
fn namespace_already_exists(ns: &NamespaceIdent) -> Error {
Error::new(
ErrorKind::NamespaceAlreadyExists,
format!("Namespace already exists: {ns}"),
)
}
fn no_such_table(t: &TableIdent) -> Error {
Error::new(
ErrorKind::TableNotFound,
format!("Table does not exist: {t}"),
)
}
fn table_already_exists(t: &TableIdent) -> Error {
Error::new(
ErrorKind::TableAlreadyExists,
format!("Table already exists: {t}"),
)
}
fn snapshot_not_found(t: &TableIdent, snapshot_id: i64) -> Error {
Error::new(
ErrorKind::TableNotFound,
format!("No recorded commit for snapshot {snapshot_id} of table {t}"),
)
}
#[async_trait]
impl Catalog for RedbCatalog {
async fn list_namespaces(
&self,
parent: Option<&NamespaceIdent>,
) -> Result<Vec<NamespaceIdent>> {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let table = read.open_table(NAMESPACES).map_err(map_redb)?;
let prefix = catalog_prefix(&self.name);
let upper = prefix_upper(&prefix);
let range = if upper.is_empty() {
table
.range::<&str>((Bound::Included(prefix.as_str()), Bound::Unbounded))
.map_err(map_redb)?
} else {
table
.range::<&str>((
Bound::Included(prefix.as_str()),
Bound::Excluded(upper.as_str()),
))
.map_err(map_redb)?
};
let parent_prefix = parent.map(|p| format!("{}.", ns_path(p)));
let parent_components = parent.map(|p| p.clone().inner().len()).unwrap_or(0);
let mut out = Vec::new();
for entry in range {
let (k, _v) = entry.map_err(map_redb)?;
let key = k.value();
let ns_str = &key[prefix.len()..];
match &parent_prefix {
None => {
if !ns_str.contains('.') && !ns_str.is_empty() {
out.push(NamespaceIdent::from_strs(ns_str.split('.'))?);
}
}
Some(pp) => {
if !ns_str.starts_with(pp) {
continue;
}
let tail = &ns_str[pp.len()..];
if tail.is_empty() || tail.contains('.') {
continue;
}
let parts: Vec<&str> = ns_str.split('.').collect();
if parts.len() == parent_components + 1 {
out.push(NamespaceIdent::from_strs(parts)?);
}
}
}
}
Ok(out)
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<Namespace> {
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let mut ns_tbl = write.open_table(NAMESPACES).map_err(map_redb)?;
let key = namespace_key(&self.name, namespace);
if ns_tbl.get(key.as_str()).map_err(map_redb)?.is_some() {
return Err(namespace_already_exists(namespace));
}
ns_tbl
.insert(key.as_str(), &[] as &[u8])
.map_err(map_redb)?;
let mut props_tbl = write.open_table(NAMESPACE_PROPS).map_err(map_redb)?;
for (k, v) in &properties {
let pk = namespace_prop_key(&self.name, namespace, k);
props_tbl
.insert(pk.as_str(), v.as_str())
.map_err(map_redb)?;
}
}
write.commit().map_err(map_redb)?;
Ok(Namespace::with_properties(namespace.clone(), properties))
}
async fn get_namespace(&self, namespace: &NamespaceIdent) -> Result<Namespace> {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let ns_tbl = read.open_table(NAMESPACES).map_err(map_redb)?;
let key = namespace_key(&self.name, namespace);
if ns_tbl.get(key.as_str()).map_err(map_redb)?.is_none() {
return Err(no_such_namespace(namespace));
}
let props_tbl = read.open_table(NAMESPACE_PROPS).map_err(map_redb)?;
let prefix = namespace_prop_prefix(&self.name, namespace);
let upper = prefix_upper(&prefix);
let mut props = HashMap::new();
let iter = if upper.is_empty() {
props_tbl
.range::<&str>((Bound::Included(prefix.as_str()), Bound::Unbounded))
.map_err(map_redb)?
} else {
props_tbl
.range::<&str>((
Bound::Included(prefix.as_str()),
Bound::Excluded(upper.as_str()),
))
.map_err(map_redb)?
};
for entry in iter {
let (k, v) = entry.map_err(map_redb)?;
let key = k.value();
let prop = &key[prefix.len()..];
props.insert(prop.to_string(), v.value().to_string());
}
Ok(Namespace::with_properties(namespace.clone(), props))
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let ns_tbl = read.open_table(NAMESPACES).map_err(map_redb)?;
let key = namespace_key(&self.name, namespace);
Ok(ns_tbl.get(key.as_str()).map_err(map_redb)?.is_some())
}
async fn update_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let ns_tbl = write.open_table(NAMESPACES).map_err(map_redb)?;
let key = namespace_key(&self.name, namespace);
if ns_tbl.get(key.as_str()).map_err(map_redb)?.is_none() {
return Err(no_such_namespace(namespace));
}
drop(ns_tbl);
let mut props_tbl = write.open_table(NAMESPACE_PROPS).map_err(map_redb)?;
let prefix = namespace_prop_prefix(&self.name, namespace);
let upper = prefix_upper(&prefix);
let to_remove: Vec<String> = {
let iter = if upper.is_empty() {
props_tbl
.range::<&str>((Bound::Included(prefix.as_str()), Bound::Unbounded))
.map_err(map_redb)?
} else {
props_tbl
.range::<&str>((
Bound::Included(prefix.as_str()),
Bound::Excluded(upper.as_str()),
))
.map_err(map_redb)?
};
iter.map(|e| e.map(|(k, _)| k.value().to_string()).map_err(map_redb))
.collect::<Result<Vec<_>>>()?
};
for k in to_remove {
props_tbl.remove(k.as_str()).map_err(map_redb)?;
}
for (k, v) in &properties {
let pk = namespace_prop_key(&self.name, namespace, k);
props_tbl
.insert(pk.as_str(), v.as_str())
.map_err(map_redb)?;
}
}
write.commit().map_err(map_redb)?;
Ok(())
}
async fn drop_namespace(&self, namespace: &NamespaceIdent) -> Result<()> {
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let mut ns_tbl = write.open_table(NAMESPACES).map_err(map_redb)?;
let key = namespace_key(&self.name, namespace);
if ns_tbl.get(key.as_str()).map_err(map_redb)?.is_none() {
return Err(no_such_namespace(namespace));
}
let tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
let prefix = table_prefix(&self.name, namespace);
let upper = prefix_upper(&prefix);
let mut iter = if upper.is_empty() {
tables_tbl
.range::<&str>((Bound::Included(prefix.as_str()), Bound::Unbounded))
.map_err(map_redb)?
} else {
tables_tbl
.range::<&str>((
Bound::Included(prefix.as_str()),
Bound::Excluded(upper.as_str()),
))
.map_err(map_redb)?
};
if iter.next().is_some() {
return Err(Error::new(
ErrorKind::Unexpected,
format!("Namespace {namespace} is not empty"),
));
}
drop(iter);
drop(tables_tbl);
ns_tbl.remove(key.as_str()).map_err(map_redb)?;
let mut props_tbl = write.open_table(NAMESPACE_PROPS).map_err(map_redb)?;
let prefix = namespace_prop_prefix(&self.name, namespace);
let upper = prefix_upper(&prefix);
let to_remove: Vec<String> = {
let iter = if upper.is_empty() {
props_tbl
.range::<&str>((Bound::Included(prefix.as_str()), Bound::Unbounded))
.map_err(map_redb)?
} else {
props_tbl
.range::<&str>((
Bound::Included(prefix.as_str()),
Bound::Excluded(upper.as_str()),
))
.map_err(map_redb)?
};
iter.map(|e| e.map(|(k, _)| k.value().to_string()).map_err(map_redb))
.collect::<Result<Vec<_>>>()?
};
for k in to_remove {
props_tbl.remove(k.as_str()).map_err(map_redb)?;
}
}
write.commit().map_err(map_redb)?;
Ok(())
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let ns_tbl = read.open_table(NAMESPACES).map_err(map_redb)?;
let ns_key = namespace_key(&self.name, namespace);
if ns_tbl.get(ns_key.as_str()).map_err(map_redb)?.is_none() {
return Err(no_such_namespace(namespace));
}
let tables_tbl = read.open_table(TABLES).map_err(map_redb)?;
let prefix = table_prefix(&self.name, namespace);
let upper = prefix_upper(&prefix);
let iter = if upper.is_empty() {
tables_tbl
.range::<&str>((Bound::Included(prefix.as_str()), Bound::Unbounded))
.map_err(map_redb)?
} else {
tables_tbl
.range::<&str>((
Bound::Included(prefix.as_str()),
Bound::Excluded(upper.as_str()),
))
.map_err(map_redb)?
};
let mut out = Vec::new();
for entry in iter {
let (k, _) = entry.map_err(map_redb)?;
let key = k.value();
let tail = &key[prefix.len()..];
if !tail.is_empty() && !tail.contains(SEP) {
out.push(TableIdent::new(namespace.clone(), tail.to_string()));
}
}
Ok(out)
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
if !self.namespace_exists(namespace).await? {
return Err(no_such_namespace(namespace));
}
let tbl_name = creation.name.clone();
let tbl_ident = TableIdent::new(namespace.clone(), tbl_name.clone());
if self.table_exists(&tbl_ident).await? {
return Err(table_already_exists(&tbl_ident));
}
let (tbl_creation, location) = match creation.location.clone() {
Some(location) => (creation, location),
None => {
let nsp_properties = self.get_namespace(namespace).await?.properties().clone();
let nsp_location = match nsp_properties.get(NAMESPACE_LOCATION_PROPERTY_KEY) {
Some(loc) => loc.clone(),
None => format!(
"{}/{}",
self.warehouse_location,
namespace.clone().inner().join("/")
),
};
let tbl_location = format!("{}/{}", nsp_location, tbl_ident.name());
(
TableCreation {
location: Some(tbl_location.clone()),
..creation
},
tbl_location,
)
}
};
let tbl_metadata = TableMetadataBuilder::from_table_creation(tbl_creation)?
.build()?
.metadata;
let tbl_metadata_location = format!(
"{}/metadata/00000-{}.metadata.json",
location,
tbl_metadata.uuid()
);
tbl_metadata
.write_to(&self.fileio, &tbl_metadata_location)
.await?;
{
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
let key = table_key(&self.name, &tbl_ident);
if tables_tbl.get(key.as_str()).map_err(map_redb)?.is_some() {
return Err(table_already_exists(&tbl_ident));
}
tables_tbl
.insert(key.as_str(), tbl_metadata_location.as_str())
.map_err(map_redb)?;
}
crate::store::record_commit(
&write,
&table_key(&self.name, &tbl_ident),
tbl_metadata.current_snapshot_id(),
&tbl_metadata_location,
)
.map_err(map_redb)?;
write.commit().map_err(map_redb)?;
self.store
.pointers
.insert(&table_key(&self.name, &tbl_ident), &tbl_metadata_location);
}
self.store.maybe_trigger_compaction();
Ok(Table::builder()
.file_io(self.fileio.clone())
.metadata_location(tbl_metadata_location)
.identifier(tbl_ident)
.metadata(tbl_metadata)
.build()?)
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
let metadata_location = self.resolve_location(identifier).await?;
if let Some(table) = self.table_cache.get(&metadata_location, identifier).await {
return Ok(table);
}
let metadata = self
.meta_cache
.get_or_load(&self.fileio, &metadata_location)
.await?;
self.table_cache
.build_and_insert(&self.fileio, identifier, metadata_location, metadata)
.await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
let key = table_key(&self.name, identifier);
if tables_tbl.get(key.as_str()).map_err(map_redb)?.is_none() {
return Err(no_such_table(identifier));
}
tables_tbl.remove(key.as_str()).map_err(map_redb)?;
}
write.commit().map_err(map_redb)?;
self.store
.pointers
.remove(&table_key(&self.name, identifier));
Ok(())
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
let key = table_key(&self.name, identifier);
if self.store.pointers.get(&key).is_some() {
return Ok(true);
}
let db = self.store.db.lock().await;
let read = db.begin_read().map_err(map_redb)?;
let tables_tbl = read.open_table(TABLES).map_err(map_redb)?;
Ok(tables_tbl.get(key.as_str()).map_err(map_redb)?.is_some())
}
async fn rename_table(&self, src: &TableIdent, dest: &TableIdent) -> Result<()> {
if src == dest {
return Ok(());
}
let db = self.store.db.lock().await;
let src_key = table_key(&self.name, src);
let dest_key = table_key(&self.name, dest);
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
let metadata_location = {
let ns_tbl = write.open_table(NAMESPACES).map_err(map_redb)?;
let dest_ns_key = namespace_key(&self.name, dest.namespace());
if ns_tbl
.get(dest_ns_key.as_str())
.map_err(map_redb)?
.is_none()
{
return Err(no_such_namespace(dest.namespace()));
}
drop(ns_tbl);
let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
let metadata_location = match tables_tbl.get(src_key.as_str()).map_err(map_redb)? {
Some(v) => v.value().to_string(),
None => return Err(no_such_table(src)),
};
if tables_tbl
.get(dest_key.as_str())
.map_err(map_redb)?
.is_some()
{
return Err(table_already_exists(dest));
}
tables_tbl.remove(src_key.as_str()).map_err(map_redb)?;
tables_tbl
.insert(dest_key.as_str(), metadata_location.as_str())
.map_err(map_redb)?;
metadata_location
};
write.commit().map_err(map_redb)?;
self.store.pointers.remove(&src_key);
self.store.pointers.insert(&dest_key, &metadata_location);
Ok(())
}
async fn register_table(
&self,
table_ident: &TableIdent,
metadata_location: String,
) -> Result<Table> {
if self.table_exists(table_ident).await? {
return Err(table_already_exists(table_ident));
}
let metadata = TableMetadata::read_from(&self.fileio, &metadata_location).await?;
{
let db = self.store.db.lock().await;
let mut write = db.begin_write().map_err(map_redb)?;
write.set_durability(self.store.durability);
{
let mut tables_tbl = write.open_table(TABLES).map_err(map_redb)?;
let key = table_key(&self.name, table_ident);
if tables_tbl.get(key.as_str()).map_err(map_redb)?.is_some() {
return Err(table_already_exists(table_ident));
}
tables_tbl
.insert(key.as_str(), metadata_location.as_str())
.map_err(map_redb)?;
}
crate::store::record_commit(
&write,
&table_key(&self.name, table_ident),
metadata.current_snapshot_id(),
&metadata_location,
)
.map_err(map_redb)?;
write.commit().map_err(map_redb)?;
self.store
.pointers
.insert(&table_key(&self.name, table_ident), &metadata_location);
}
self.store.maybe_trigger_compaction();
Ok(Table::builder()
.identifier(table_ident.clone())
.metadata_location(metadata_location)
.metadata(metadata)
.file_io(self.fileio.clone())
.build()?)
}
async fn update_table(&self, commit: TableCommit) -> Result<Table> {
let table_ident = commit.identifier().clone();
let current_table = self.load_table(&table_ident).await?;
let current_metadata_location = current_table.metadata_location_result()?.to_string();
let staged_table = commit.apply(current_table)?;
self.persist_commit(table_ident, current_metadata_location, staged_table)
.await
}
}