use super::rest::IcebergRestCatalog;
use super::{map_catalog_error, AuthProvider, Catalog, CatalogError, CatalogOptions, RetryConfig};
use crate::error::{Error, Result};
use crate::io::FileIO;
use crate::spec::{NamespaceIdent, TableCreation, TableIdent};
use crate::table::Table;
use async_trait::async_trait;
use std::collections::HashMap;
use std::time::Duration;
#[derive(Debug)]
pub struct RestCatalog {
inner: IcebergRestCatalog,
}
impl RestCatalog {
pub fn builder(name: impl Into<String>, endpoint: impl Into<String>) -> RestCatalogBuilder {
RestCatalogBuilder::new(name, endpoint)
}
pub fn new(
name: impl Into<String>,
endpoint: impl Into<String>,
auth_provider: impl RestAuthProvider + 'static,
file_io: FileIO,
) -> Result<Self> {
Self::builder(name, endpoint)
.with_auth_provider(auth_provider)
.with_file_io(file_io)
.build()
}
}
pub struct RestCatalogBuilder {
name: String,
endpoint: String,
prefix: String,
options: CatalogOptions,
file_io: Option<FileIO>,
auth_provider: Option<Box<dyn RestAuthProvider>>,
}
impl RestCatalogBuilder {
fn new(name: impl Into<String>, endpoint: impl Into<String>) -> Self {
Self {
name: name.into(),
endpoint: endpoint.into(),
prefix: String::new(),
options: CatalogOptions::default(),
file_io: None,
auth_provider: None,
}
}
pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = prefix.into();
self
}
pub fn with_options(mut self, options: CatalogOptions) -> Self {
self.options = options;
self
}
pub fn with_file_io(mut self, file_io: FileIO) -> Self {
self.file_io = Some(file_io);
self
}
pub fn with_auth_provider<A>(mut self, provider: A) -> Self
where
A: RestAuthProvider + 'static,
{
self.auth_provider = Some(Box::new(provider));
self
}
pub fn with_bearer_token(mut self, token: impl Into<String>) -> Self {
self.auth_provider = Some(Box::new(crate::catalog::BearerTokenAuthProvider::new(
token,
)));
self
}
pub fn with_retry_config(mut self, retry: RetryConfig) -> Self {
self.options = self.options.with_retry_config(retry);
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
let http = self.options.http().clone().with_timeout(timeout);
self.options = self.options.with_http_config(http);
self
}
pub fn build(self) -> Result<RestCatalog> {
let file_io = self.file_io.ok_or_else(|| {
Error::invalid_config("RestCatalog requires a FileIO. Call with_file_io first.")
})?;
let auth_provider = self.auth_provider.ok_or_else(|| {
Error::invalid_config("RestCatalog requires an auth provider. Call with_auth_provider.")
})?;
let adapter: Box<dyn AuthProvider> = Box::new(ExternalAuthProvider {
inner: auth_provider,
});
let inner = IcebergRestCatalog::from_components(
self.name,
self.endpoint,
self.prefix,
adapter,
file_io,
self.options,
)
.map_err(map_catalog_error)?;
Ok(RestCatalog { inner })
}
}
#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
pub trait RestAuthProvider: Send + Sync + std::fmt::Debug {
async fn sign_request(&self, request: reqwest::Request) -> Result<reqwest::Request>;
}
struct ExternalAuthProvider {
inner: Box<dyn RestAuthProvider>,
}
impl std::fmt::Debug for ExternalAuthProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ExternalAuthProvider")
.finish_non_exhaustive()
}
}
#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
impl AuthProvider for ExternalAuthProvider {
async fn sign_request(&self, request: reqwest::Request) -> super::Result<reqwest::Request> {
self.inner
.sign_request(request)
.await
.map_err(map_auth_error)
}
}
fn map_auth_error(err: Error) -> CatalogError {
match err {
Error::Unauthorized { provider } => CatalogError::AuthError(provider),
Error::NetworkError { source } => CatalogError::Network(source),
Error::InvalidRequest { message } => CatalogError::InvalidRequest(message),
Error::ServerError { status, message } => CatalogError::ServerError { status, message },
_ => CatalogError::AuthError(err.to_string()),
}
}
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl Catalog for RestCatalog {
fn file_io(&self) -> &crate::io::FileIO {
self.inner.file_io()
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
self.inner.create_namespace(namespace, properties).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn list_namespaces(&self) -> Result<Vec<NamespaceIdent>> {
self.inner.list_namespaces().await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
self.inner.table_exists(identifier).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
self.inner.load_table(identifier).await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
self.inner.drop_table(identifier).await
}
async fn update_table_metadata(
&self,
identifier: &TableIdent,
old_metadata_location: &str,
new_metadata_location: &str,
) -> Result<()> {
self.inner
.update_table_metadata(identifier, old_metadata_location, new_metadata_location)
.await
}
async fn expire_snapshots(&self, identifier: &TableIdent, snapshot_ids: &[i64]) -> Result<()> {
self.inner.expire_snapshots(identifier, snapshot_ids).await
}
}
#[cfg(target_family = "wasm")]
#[async_trait(?Send)]
impl Catalog for RestCatalog {
fn file_io(&self) -> &crate::io::FileIO {
self.inner.file_io()
}
async fn create_namespace(
&self,
namespace: &NamespaceIdent,
properties: HashMap<String, String>,
) -> Result<()> {
self.inner.create_namespace(namespace, properties).await
}
async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
self.inner.namespace_exists(namespace).await
}
async fn list_namespaces(&self) -> Result<Vec<NamespaceIdent>> {
self.inner.list_namespaces().await
}
async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
self.inner.list_tables(namespace).await
}
async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
self.inner.table_exists(identifier).await
}
async fn create_table(
&self,
namespace: &NamespaceIdent,
creation: TableCreation,
) -> Result<Table> {
self.inner.create_table(namespace, creation).await
}
async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
self.inner.load_table(identifier).await
}
async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
self.inner.drop_table(identifier).await
}
async fn update_table_metadata(
&self,
identifier: &TableIdent,
old_metadata_location: &str,
new_metadata_location: &str,
) -> Result<()> {
self.inner
.update_table_metadata(identifier, old_metadata_location, new_metadata_location)
.await
}
async fn expire_snapshots(&self, identifier: &TableIdent, snapshot_ids: &[i64]) -> Result<()> {
self.inner.expire_snapshots(identifier, snapshot_ids).await
}
}
#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
impl RestAuthProvider for crate::catalog::BearerTokenAuthProvider {
async fn sign_request(&self, request: reqwest::Request) -> Result<reqwest::Request> {
<Self as AuthProvider>::sign_request(self, request)
.await
.map_err(map_catalog_error)
}
}
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl RestAuthProvider for crate::catalog::SigV4AuthProvider {
async fn sign_request(&self, request: reqwest::Request) -> Result<reqwest::Request> {
<Self as AuthProvider>::sign_request(self, request)
.await
.map_err(map_catalog_error)
}
}
#[cfg(test)]
mod tests {
use super::*;
use opendal::Operator;
#[derive(Debug)]
struct NoopAuth;
#[cfg_attr(not(target_family = "wasm"), async_trait)]
#[cfg_attr(target_family = "wasm", async_trait(?Send))]
impl RestAuthProvider for NoopAuth {
async fn sign_request(&self, request: reqwest::Request) -> Result<reqwest::Request> {
Ok(request)
}
}
fn memory_file_io() -> FileIO {
let operator =
Operator::via_iter(opendal::Scheme::Memory, []).expect("memory operator should build");
FileIO::new(operator)
}
#[test]
fn builder_requires_file_io() {
let err = RestCatalog::builder("test", "https://example.com/iceberg")
.with_auth_provider(NoopAuth)
.build()
.expect_err("missing FileIO should error");
assert!(matches!(err, Error::InvalidConfig { .. }));
}
#[test]
fn builder_requires_auth() {
let err = RestCatalog::builder("test", "https://example.com/iceberg")
.with_file_io(memory_file_io())
.build()
.expect_err("missing auth provider should error");
assert!(matches!(err, Error::InvalidConfig { .. }));
}
#[test]
fn builder_accepts_components() {
let result = RestCatalog::builder("test", "https://example.com/iceberg")
.with_prefix("warehouse")
.with_file_io(memory_file_io())
.with_auth_provider(NoopAuth)
.build();
assert!(result.is_ok());
}
}