hamelin_datafusion 0.6.10

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Iceberg table resolution for DataFusion.
//!
//! Provides two modes of Iceberg integration:
//! - **Catalog mode**: Connect to a REST catalog and resolve tables on-demand
//!   based on what the parsed query references.
//! - **Dataset mode**: Load a single table directly from an S3 metadata location.

use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;

use datafusion::datasource::TableProvider;
use hamelin_executor::executor::ExecutorError;
use hamelin_lib::catalog::Column;
use iceberg::io::{FileIOBuilder, LocalFsStorageFactory, StorageFactory};
use iceberg::table::StaticTable;
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableIdent};
use iceberg_catalog_rest::RestCatalogBuilder;
use iceberg_datafusion::{IcebergStaticTableProvider, IcebergTableProvider};
use iceberg_storage_opendal::OpenDalStorageFactory;

use crate::arrow::arrow_schema_to_columns;

/// Build an Iceberg REST catalog client.
///
/// AWS credentials are resolved natively by iceberg/opendal/reqsign from
/// environment variables, IRSA, IMDS, or shared config files.
pub async fn build_iceberg_rest_catalog(
    name: &str,
    uri: &str,
    warehouse: Option<&str>,
    extra_properties: HashMap<String, String>,
) -> Result<Arc<dyn Catalog>, ExecutorError> {
    let mut props = HashMap::from([("uri".to_string(), uri.to_string())]);
    if let Some(wh) = warehouse {
        props.insert("warehouse".to_string(), wh.to_string());
    }
    props.extend(
        extra_properties
            .into_iter()
            .filter(|(k, _)| k != "uri" && k != "warehouse"),
    );

    let catalog = RestCatalogBuilder::default()
        .load(name, props)
        .await
        .map_err(|e| {
            ExecutorError::ConfigurationError(
                anyhow::anyhow!(
                    "Failed to connect to Iceberg REST catalog '{}': {}",
                    name,
                    e
                )
                .into(),
            )
        })?;

    Ok(Arc::new(catalog))
}

/// Load a single table from an Iceberg catalog and convert its schema to Hamelin columns.
///
/// Uses the catalog-backed `IcebergTableProvider` which supports both reads and writes.
/// Returns the Hamelin columns and a DataFusion `TableProvider` for the table.
pub async fn resolve_iceberg_table(
    catalog: Arc<dyn Catalog>,
    namespace: &str,
    table_name: &str,
) -> Result<(Vec<Column>, Arc<dyn TableProvider>), ExecutorError> {
    let namespace_ident = NamespaceIdent::new(namespace.to_string());

    let provider = IcebergTableProvider::try_new(catalog, namespace_ident, table_name.to_string())
        .await
        .map_err(|e| {
            ExecutorError::ConfigurationError(
                anyhow::anyhow!(
                    "Failed to load Iceberg table '{}.{}': {}",
                    namespace,
                    table_name,
                    e
                )
                .into(),
            )
        })?;

    let schema = provider.schema();
    let columns = arrow_schema_to_columns(&schema);

    Ok((columns, Arc::new(provider)))
}

/// Load a single table from an S3 metadata location (dataset mode).
///
/// This is the developer escape hatch — load a specific Iceberg table by its
/// metadata file location without needing a catalog.
///
/// AWS credentials and region are resolved natively by opendal/reqsign from
/// environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, AWS_REGION).
pub async fn resolve_iceberg_metadata(
    metadata_location: &str,
    region: Option<&str>,
) -> Result<(Vec<Column>, Arc<dyn TableProvider>), ExecutorError> {
    // Normalize s3a:// (Hadoop convention) to s3:// (standard) and select storage factory.
    let (factory, normalized_location): (Arc<dyn StorageFactory>, Cow<str>) = if metadata_location
        .starts_with("s3://")
        || metadata_location.starts_with("s3a://")
    {
        let location = metadata_location
            .strip_prefix("s3a://")
            .map(|rest| Cow::Owned(format!("s3://{}", rest)))
            .unwrap_or(Cow::Borrowed(metadata_location));
        (
            Arc::new(OpenDalStorageFactory::S3 {
                configured_scheme: "s3".to_string(),
                customized_credential_load: None,
            }),
            location,
        )
    } else if metadata_location.starts_with("gs://")
        || metadata_location.starts_with("oss://")
        || metadata_location.starts_with("abfs://")
        || metadata_location.starts_with("abfss://")
    {
        return Err(ExecutorError::ConfigurationError(
                anyhow::anyhow!(
                    "Unsupported storage scheme in '{}'; only s3:// and local paths are currently supported",
                    metadata_location
                )
                .into(),
            ));
    } else {
        (
            Arc::new(LocalFsStorageFactory),
            Cow::Borrowed(metadata_location),
        )
    };
    let mut file_io_builder = FileIOBuilder::new(factory);
    if let Some(r) = region {
        file_io_builder = file_io_builder.with_prop("s3.region", r);
    }
    let file_io = file_io_builder.build();

    // Use a synthetic table identifier since we're loading directly from metadata
    let table_ident = TableIdent::new(
        NamespaceIdent::new("default".to_string()),
        "iceberg_table".to_string(),
    );

    let static_table = StaticTable::from_metadata_file(&normalized_location, table_ident, file_io)
        .await
        .map_err(|e| {
            ExecutorError::ConfigurationError(
                anyhow::anyhow!(
                    "Failed to load Iceberg table from metadata '{}': {}",
                    metadata_location,
                    e
                )
                .into(),
            )
        })?;

    let table = static_table.into_table();

    let provider = IcebergStaticTableProvider::try_new_from_table(table)
        .await
        .map_err(|e| {
            ExecutorError::ConfigurationError(
                anyhow::anyhow!(
                    "Failed to create table provider from metadata '{}': {}",
                    metadata_location,
                    e
                )
                .into(),
            )
        })?;

    let schema = provider.schema();
    let columns = arrow_schema_to_columns(&schema);

    Ok((columns, Arc::new(provider)))
}