icepick 0.4.1

Experimental Rust client for Apache Iceberg with WASM support for AWS S3 Tables and Cloudflare R2
Documentation
//! Cloudflare R2 Data Catalog implementation
//!
//! Provides a production-ready implementation of the Iceberg catalog trait for Cloudflare R2.
//! This catalog uses bearer token authentication and supports both native and WASM platforms.

use crate::catalog::rest::IcebergRestCatalog;
use crate::catalog::{map_catalog_error, Catalog, CatalogOptions};
use crate::error::Result;
use crate::spec::{NamespaceIdent, TableCreation, TableIdent};
use crate::table::Table;
use async_trait::async_trait;
use std::collections::HashMap;

/// Cloudflare R2 Data Catalog
///
/// This catalog provides access to Apache Iceberg tables stored in Cloudflare R2.
/// It uses bearer token authentication and works on all platforms including WASM.
///
/// # Platform Support
///
/// Unlike S3TablesCatalog, R2Catalog supports both native platforms and WASM
/// (wasm32-unknown-unknown), making it suitable for browser and Cloudflare Workers use cases.
///
/// Use [`R2Catalog::with_options`] to customize HTTP timeouts, retries, or to target
/// Iceberg branches other than `main`.
///
/// # Example
///
/// ```no_run
/// use icepick::R2Catalog;
/// use icepick::catalog::Catalog;
/// use icepick::spec::{TableIdent, NamespaceIdent};
///
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// // Create catalog for R2
/// let catalog = R2Catalog::new(
///     "my-catalog",
///     "account-id",
///     "bucket-name",
///     "api-token"
/// ).await?;
///
/// // Use the catalog
/// let namespace = NamespaceIdent::new(vec!["my_namespace".to_string()]);
/// let table_id = TableIdent::new(namespace, "my_table".to_string());
/// let table = catalog.load_table(&table_id).await?;
/// # Ok(())
/// # }
/// ```
///
/// # Authentication
///
/// The catalog uses Cloudflare API tokens for authentication. To create an API token:
///
/// 1. Log into the Cloudflare dashboard
/// 2. Go to "My Profile" → "API Tokens"
/// 3. Create a token with R2 read/write permissions
/// 4. Use the token when constructing the catalog
#[derive(Debug)]
pub struct R2Catalog {
    inner: IcebergRestCatalog,
}

impl R2Catalog {
    /// Create a new R2 catalog
    ///
    /// # Arguments
    ///
    /// * `name` - Catalog name for identification
    /// * `account_id` - Cloudflare account ID
    /// * `bucket_name` - R2 bucket name
    /// * `api_token` - Cloudflare API token with R2 permissions
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The account ID or bucket name is invalid
    /// - The API token is invalid or lacks permissions
    /// - The R2 service is unreachable
    ///
    /// # Example
    ///
    /// ```no_run
    /// use icepick::R2Catalog;
    ///
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let catalog = R2Catalog::new(
    ///     "production",
    ///     "abc123",
    ///     "my-bucket",
    ///     "cloudflare-api-token-here"
    /// ).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn new(
        name: impl Into<String>,
        account_id: impl Into<String>,
        bucket_name: impl Into<String>,
        api_token: impl Into<String>,
    ) -> Result<Self> {
        let name = name.into();
        let account_id = account_id.into();
        let bucket_name = bucket_name.into();
        let api_token = api_token.into();

        let inner = IcebergRestCatalog::from_r2(name, account_id, bucket_name, api_token)
            .await
            .map_err(map_catalog_error)?;

        Ok(Self { inner })
    }

    /// Create a new R2 catalog with explicit options such as branch and HTTP configuration.
    ///
    /// * `name` - Logical catalog name.
    /// * `account_id` / `bucket_name` / `api_token` - Same as [`R2Catalog::new`].
    /// * `options` - Additional configuration (HTTP timeouts, retries, default reference).
    pub async fn with_options(
        name: impl Into<String>,
        account_id: impl Into<String>,
        bucket_name: impl Into<String>,
        api_token: impl Into<String>,
        options: CatalogOptions,
    ) -> Result<Self> {
        let name = name.into();
        let account_id = account_id.into();
        let bucket_name = bucket_name.into();
        let api_token = api_token.into();

        let inner = IcebergRestCatalog::from_r2_with_options(
            name,
            account_id,
            bucket_name,
            api_token,
            options,
        )
        .await
        .map_err(map_catalog_error)?;

        Ok(Self { inner })
    }

    /// Create a new R2 catalog with explicit AWS credentials for S3 access.
    ///
    /// This method is useful in WASM environments or when you need to provide explicit
    /// credentials instead of relying on environment variable discovery.
    ///
    /// # Arguments
    ///
    /// * `name` - Catalog name for identification
    /// * `account_id` - Cloudflare account ID
    /// * `bucket_name` - R2 bucket name
    /// * `api_token` - Cloudflare API token with R2 permissions (for catalog API)
    /// * `access_key_id` - R2 access key ID (for S3-compatible storage access)
    /// * `secret_access_key` - R2 secret access key (for S3-compatible storage access)
    /// * `options` - Additional configuration (HTTP timeouts, retries, default reference)
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - The account ID or bucket name is invalid
    /// - The API token is invalid or lacks permissions
    /// - The credentials are invalid
    /// - The R2 service is unreachable
    ///
    /// # Example
    ///
    /// ```no_run
    /// use icepick::{R2Catalog, catalog::CatalogOptions};
    ///
    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
    /// let catalog = R2Catalog::with_credentials(
    ///     "production",
    ///     "abc123",
    ///     "my-bucket",
    ///     "cloudflare-api-token",
    ///     "r2-access-key-id",
    ///     "r2-secret-access-key",
    ///     CatalogOptions::default(),
    /// ).await?;
    /// # Ok(())
    /// # }
    /// ```
    pub async fn with_credentials(
        name: impl Into<String>,
        account_id: impl Into<String>,
        bucket_name: impl Into<String>,
        api_token: impl Into<String>,
        access_key_id: impl Into<String>,
        secret_access_key: impl Into<String>,
        options: CatalogOptions,
    ) -> Result<Self> {
        let name = name.into();
        let account_id_str = account_id.into();
        let bucket_name_str = bucket_name.into();
        let api_token = api_token.into();
        let access_key_id = access_key_id.into();
        let secret_access_key = secret_access_key.into();

        // Build the R2 S3-compatible endpoint
        let r2_endpoint = format!("https://{}.r2.cloudflarestorage.com", account_id_str);

        // Create OpenDAL S3 operator with explicit credentials
        use opendal::services::S3;
        let s3_builder = S3::default()
            .bucket(&bucket_name_str)
            .region("auto") // R2 always uses "auto" region
            .endpoint(&r2_endpoint)
            .access_key_id(&access_key_id)
            .secret_access_key(&secret_access_key);

        let operator = opendal::Operator::new(s3_builder)
            .map_err(|e| {
                crate::error::Error::IoError(format!("Failed to create S3 operator: {}", e))
            })?
            .finish();

        let file_io = crate::io::FileIO::new(operator);

        // Create R2Config for catalog initialization
        let config = crate::catalog::R2Config {
            account_id: account_id_str,
            bucket_name: bucket_name_str,
            api_token,
            endpoint_override: None,
        };

        // Use the new from_r2_with_file_io method
        let inner = IcebergRestCatalog::from_r2_with_file_io(name, config, file_io, options)
            .await
            .map_err(map_catalog_error)?;

        Ok(Self { inner })
    }

    /// Return a reference to the underlying FileIO for reading data files.
    pub fn file_io(&self) -> &crate::io::FileIO {
        self.inner.file_io()
    }
}

// Implement Catalog trait by delegating to inner IcebergRestCatalog (native platforms)
#[cfg(not(target_family = "wasm"))]
#[async_trait]
impl Catalog for R2Catalog {
    fn file_io(&self) -> &crate::io::FileIO {
        self.inner.file_io()
    }

    async fn create_namespace(
        &self,
        namespace: &NamespaceIdent,
        properties: HashMap<String, String>,
    ) -> Result<()> {
        self.inner.create_namespace(namespace, properties).await
    }

    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
        self.inner.namespace_exists(namespace).await
    }

    async fn list_namespaces(&self) -> Result<Vec<NamespaceIdent>> {
        self.inner.list_namespaces().await
    }

    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
        self.inner.list_tables(namespace).await
    }

    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
        self.inner.table_exists(identifier).await
    }

    async fn create_table(
        &self,
        namespace: &NamespaceIdent,
        creation: TableCreation,
    ) -> Result<Table> {
        self.inner.create_table(namespace, creation).await
    }

    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
        self.inner.load_table(identifier).await
    }

    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
        self.inner.drop_table(identifier).await
    }

    async fn update_table_metadata(
        &self,
        identifier: &TableIdent,
        old_metadata_location: &str,
        new_metadata_location: &str,
    ) -> Result<()> {
        self.inner
            .update_table_metadata(identifier, old_metadata_location, new_metadata_location)
            .await
    }
}

// Implement Catalog trait by delegating to inner IcebergRestCatalog (WASM platforms)
#[cfg(target_family = "wasm")]
#[async_trait(?Send)]
impl Catalog for R2Catalog {
    fn file_io(&self) -> &crate::io::FileIO {
        self.inner.file_io()
    }

    async fn create_namespace(
        &self,
        namespace: &NamespaceIdent,
        properties: HashMap<String, String>,
    ) -> Result<()> {
        self.inner.create_namespace(namespace, properties).await
    }

    async fn namespace_exists(&self, namespace: &NamespaceIdent) -> Result<bool> {
        self.inner.namespace_exists(namespace).await
    }

    async fn list_namespaces(&self) -> Result<Vec<NamespaceIdent>> {
        self.inner.list_namespaces().await
    }

    async fn list_tables(&self, namespace: &NamespaceIdent) -> Result<Vec<TableIdent>> {
        self.inner.list_tables(namespace).await
    }

    async fn table_exists(&self, identifier: &TableIdent) -> Result<bool> {
        self.inner.table_exists(identifier).await
    }

    async fn create_table(
        &self,
        namespace: &NamespaceIdent,
        creation: TableCreation,
    ) -> Result<Table> {
        self.inner.create_table(namespace, creation).await
    }

    async fn load_table(&self, identifier: &TableIdent) -> Result<Table> {
        self.inner.load_table(identifier).await
    }

    async fn drop_table(&self, identifier: &TableIdent) -> Result<()> {
        self.inner.drop_table(identifier).await
    }

    async fn update_table_metadata(
        &self,
        identifier: &TableIdent,
        old_metadata_location: &str,
        new_metadata_location: &str,
    ) -> Result<()> {
        self.inner
            .update_table_metadata(identifier, old_metadata_location, new_metadata_location)
            .await
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_r2_catalog_debug() {
        // Just verify the type exists and Debug is implemented
        // We can't construct without valid credentials
        let _type_check: fn(R2Catalog) = |c| {
            let _ = format!("{:?}", c);
        };
    }
}