use crate::catalog::rest::IcebergRestCatalog;
use crate::catalog::{Catalog, CatalogOptions};
use crate::error::{Error, Result};
use crate::spec::{NamespaceIdent, TableCreation, TableIdent};
use crate::table::Table;
use async_trait::async_trait;
use std::collections::HashMap;
#[derive(Debug)]
pub struct R2Catalog {
inner: IcebergRestCatalog,
}
impl R2Catalog {
pub async fn new(
name: impl Into<String>,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
) -> Result<Self> {
let name = name.into();
let account_id = account_id.into();
let bucket_name = bucket_name.into();
let api_token = api_token.into();
let inner = IcebergRestCatalog::from_r2(name, account_id, bucket_name, api_token)
.await
.map_err(map_catalog_error)?;
Ok(Self { inner })
}
pub async fn with_options(
name: impl Into<String>,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
options: CatalogOptions,
) -> Result<Self> {
let name = name.into();
let account_id = account_id.into();
let bucket_name = bucket_name.into();
let api_token = api_token.into();
let inner = IcebergRestCatalog::from_r2_with_options(
name,
account_id,
bucket_name,
api_token,
options,
)
.await
.map_err(map_catalog_error)?;
Ok(Self { inner })
}
}
fn map_catalog_error(e: crate::catalog::CatalogError) -> Error {
match e {
#[cfg(not(target_family = "wasm"))]
crate::catalog::CatalogError::InvalidArn(msg) => Error::invalid_arn(msg),
crate::catalog::CatalogError::AuthError(msg) => Error::unauthorized(msg),
crate::catalog::CatalogError::HttpError(msg) => Error::unexpected(msg),
crate::catalog::CatalogError::ServerError { status, message } => {
Error::server_error(status, message)
}
crate::catalog::CatalogError::Network(err) => Error::NetworkError { source: err },
crate::catalog::CatalogError::NotFound(msg) => Error::not_found(msg),
crate::catalog::CatalogError::Conflict(msg) => Error::conflict(msg),
crate::catalog::CatalogError::InvalidRequest(msg) => Error::invalid_request(msg),
crate::catalog::CatalogError::Unexpected(msg) => Error::unexpected(msg),
}
}
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl Catalog for R2Catalog {
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
self.inner.create_namespace(namespace, properties).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
self.inner.table_exists(identifier).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
self.inner.load_table(identifier).await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
self.inner.drop_table(identifier).await
}
async fn update_table_metadata(
&self,
identifier: &TableIdent,
old_metadata_location: &str,
new_metadata_location: &str,
) -> Result<()> {
self.inner
.update_table_metadata(identifier, old_metadata_location, new_metadata_location)
.await
}
}
#[cfg(target_family = "wasm")]
#[async_trait(?Send)]
impl Catalog for R2Catalog {
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
self.inner.create_namespace(namespace, properties).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
self.inner.table_exists(identifier).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
self.inner.load_table(identifier).await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
self.inner.drop_table(identifier).await
}
async fn update_table_metadata(
&self,
identifier: &TableIdent,
old_metadata_location: &str,
new_metadata_location: &str,
) -> Result<()> {
self.inner
.update_table_metadata(identifier, old_metadata_location, new_metadata_location)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_r2_catalog_debug() {
let _type_check: fn(R2Catalog) = |c| {
let _ = format!("{:?}", c);
};
}
}