use crate::catalog::rest::IcebergRestCatalog;
use crate::catalog::{map_catalog_error, Catalog, CatalogOptions};
use crate::error::Result;
use crate::spec::{NamespaceIdent, TableCreation, TableIdent};
use crate::table::Table;
use async_trait::async_trait;
use std::collections::HashMap;
#[derive(Debug)]
pub struct R2Catalog {
inner: IcebergRestCatalog,
}
impl R2Catalog {
pub async fn new(
name: impl Into<String>,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
) -> Result<Self> {
let name = name.into();
let account_id = account_id.into();
let bucket_name = bucket_name.into();
let api_token = api_token.into();
let inner = IcebergRestCatalog::from_r2(name, account_id, bucket_name, api_token)
.await
.map_err(map_catalog_error)?;
Ok(Self { inner })
}
pub async fn with_options(
name: impl Into<String>,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
options: CatalogOptions,
) -> Result<Self> {
let name = name.into();
let account_id = account_id.into();
let bucket_name = bucket_name.into();
let api_token = api_token.into();
let inner = IcebergRestCatalog::from_r2_with_options(
name,
account_id,
bucket_name,
api_token,
options,
)
.await
.map_err(map_catalog_error)?;
Ok(Self { inner })
}
pub async fn with_credentials(
name: impl Into<String>,
account_id: impl Into<String>,
bucket_name: impl Into<String>,
api_token: impl Into<String>,
access_key_id: impl Into<String>,
secret_access_key: impl Into<String>,
options: CatalogOptions,
) -> Result<Self> {
let name = name.into();
let account_id_str = account_id.into();
let bucket_name_str = bucket_name.into();
let api_token = api_token.into();
let access_key_id = access_key_id.into();
let secret_access_key = secret_access_key.into();
let r2_endpoint = format!("https://{}.r2.cloudflarestorage.com", account_id_str);
use opendal::services::S3;
let s3_builder = S3::default()
.bucket(&bucket_name_str)
.region("auto") .endpoint(&r2_endpoint)
.access_key_id(&access_key_id)
.secret_access_key(&secret_access_key);
let operator = opendal::Operator::new(s3_builder)
.map_err(|e| {
crate::error::Error::IoError(format!("Failed to create S3 operator: {}", e))
})?
.finish();
let file_io = crate::io::FileIO::new(operator);
let config = crate::catalog::R2Config {
account_id: account_id_str,
bucket_name: bucket_name_str,
api_token,
endpoint_override: None,
};
let inner = IcebergRestCatalog::from_r2_with_file_io(name, config, file_io, options)
.await
.map_err(map_catalog_error)?;
Ok(Self { inner })
}
pub fn file_io(&self) -> &crate::io::FileIO {
self.inner.file_io()
}
}
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl Catalog for R2Catalog {
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
self.inner.create_namespace(namespace, properties).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
self.inner.table_exists(identifier).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
self.inner.load_table(identifier).await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
self.inner.drop_table(identifier).await
}
async fn update_table_metadata(
&self,
identifier: &TableIdent,
old_metadata_location: &str,
new_metadata_location: &str,
) -> Result<()> {
self.inner
.update_table_metadata(identifier, old_metadata_location, new_metadata_location)
.await
}
}
#[cfg(target_family = "wasm")]
#[async_trait(?Send)]
impl Catalog for R2Catalog {
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
self.inner.create_namespace(namespace, properties).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
self.inner.table_exists(identifier).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
self.inner.load_table(identifier).await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
self.inner.drop_table(identifier).await
}
async fn update_table_metadata(
&self,
identifier: &TableIdent,
old_metadata_location: &str,
new_metadata_location: &str,
) -> Result<()> {
self.inner
.update_table_metadata(identifier, old_metadata_location, new_metadata_location)
.await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_r2_catalog_debug() {
let _type_check: fn(R2Catalog) = |c| {
let _ = format!("{:?}", c);
};
}
}