hamelin_datafusion 0.6.13

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Top-level DataFusion translation API — async functions for translating typed ASTs
//! to DataFusion LogicalPlans.
//!
//! The convenience functions (`translate`, `parse_and_translate`, etc.) derive an
//! `EnvironmentProvider` automatically from the `SessionContext`'s registered table
//! providers, so callers don't need to pass one separately. The `_with_options`
//! variants still accept an explicit provider for advanced use cases.

use std::sync::Arc;

use datafusion::prelude::SessionContext;
use hamelin_lib::catalog::CatalogProvider as HamelinCatalogProvider;
use hamelin_lib::err::TranslationErrors;
use hamelin_lib::parse_and_typecheck_with_options;
use hamelin_lib::provider::EnvironmentProvider;
use hamelin_lib::sql::expression::identifier::{Identifier, SimpleIdentifier};
use hamelin_lib::tree::ast::expression::Expression;
use hamelin_lib::tree::ast::query::Query;
use hamelin_lib::tree::options::{TranslateOptions, TypeCheckOptions};
use hamelin_lib::tree::typed_ast::query::TypedStatement;
use hamelin_lib::type_check_with_options;
use hamelin_translation::{normalize_with, NormalizationOptions};

use crate::arrow::arrow_to_hamelin_type;
use crate::statement::{translate_statement, TranslatedStatement};

// ---------------------------------------------------------------------------
// SessionContext -> EnvironmentProvider
// ---------------------------------------------------------------------------

/// Build a Hamelin `EnvironmentProvider` from a DataFusion `SessionContext` by
/// enumerating all registered tables and converting their Arrow schemas to
/// Hamelin types.
pub async fn catalog_provider_from_session(
    ctx: &SessionContext,
) -> Result<Arc<dyn EnvironmentProvider>, TranslationErrors> {
    let catalog = HamelinCatalogProvider::default();
    let state = ctx.state();
    let catalog_list = state.catalog_list();
    let options = state.config().options();
    let default_catalog = &options.catalog.default_catalog;
    let default_schema = &options.catalog.default_schema;

    for catalog_name in catalog_list.catalog_names() {
        let Some(df_catalog) = catalog_list.catalog(&catalog_name) else {
            continue;
        };

        for schema_name in df_catalog.schema_names() {
            let Some(schema) = df_catalog.schema(&schema_name) else {
                continue;
            };

            for table_name in schema.table_names() {
                // Skip tables that fail to load or don't exist. DataFusion system tables
                // (e.g. information_schema) can error during construction, and we don't
                // want that to block catalog building for the user's actual tables.
                let Some(provider) = schema.table(&table_name).await.ok().flatten() else {
                    continue;
                };

                let arrow_schema = provider.schema();
                let cols: ordermap::OrderMap<_, _> = arrow_schema
                    .fields()
                    .iter()
                    .map(|f| {
                        let id = SimpleIdentifier::new(f.name());
                        let typ = arrow_to_hamelin_type(f.data_type());
                        (id, typ)
                    })
                    .collect();

                // Always register the fully qualified 3-part identifier.
                let table_si = SimpleIdentifier::new(&table_name);
                let schema_si = SimpleIdentifier::new(&schema_name);
                let catalog_si = SimpleIdentifier::new(&catalog_name);

                if let Some(full) = Identifier::from_simples(vec![
                    catalog_si.clone(),
                    schema_si.clone(),
                    table_si.clone(),
                ]) {
                    catalog.set(full, cols.clone());
                }

                // For the default catalog, also register as schema.table.
                if catalog_name == *default_catalog {
                    if let Some(partial) =
                        Identifier::from_simples(vec![schema_si, table_si.clone()])
                    {
                        catalog.set(partial, cols.clone());
                    }
                }

                // For the default catalog + schema, also register as bare table name.
                if catalog_name == *default_catalog && schema_name == *default_schema {
                    catalog.set(table_si.into(), cols);
                }
            }
        }
    }

    Ok(Arc::new(catalog))
}

// ---------------------------------------------------------------------------
// typed AST -> DataFusion LogicalPlan
// ---------------------------------------------------------------------------

/// Translate a type-checked Hamelin AST into a DataFusion LogicalPlan.
///
/// The environment provider is derived from the `SessionContext`'s registered
/// table providers, so callers only need to pass the session context.
pub async fn translate(
    typed: impl Into<Arc<TypedStatement>>,
    ctx: &SessionContext,
) -> Result<TranslatedStatement, TranslationErrors> {
    let provider = catalog_provider_from_session(ctx).await?;
    translate_with_options(
        typed,
        ctx,
        TranslateOptions::builder().provider(provider).build(),
    )
    .await
}

/// Translate a type-checked Hamelin AST into a DataFusion LogicalPlan with
/// configurable options.
///
/// Automatically enables transform lowering since DataFusion doesn't support
/// lambdas.
pub async fn translate_with_options(
    typed: impl Into<Arc<TypedStatement>>,
    ctx: &SessionContext,
    opts: TranslateOptions,
) -> Result<TranslatedStatement, TranslationErrors> {
    let typed = typed.into();
    let mut norm_opts: NormalizationOptions = normalize_with()
        .with_lower_transform()
        .with_registry(opts.registry)
        .with_provider(opts.provider);

    if let Some(ts_field) = opts.timestamp_field {
        norm_opts = norm_opts.with_timestamp_field(ts_field);
    }
    if let Some(msg_field) = opts.message_field {
        norm_opts = norm_opts.with_message_field(msg_field);
    }

    let ir = norm_opts.lower(typed)?;
    translate_statement(&ir, ctx)
        .await
        .map_err(|e| e.as_ref().clone().single())
}

// ---------------------------------------------------------------------------
// Hamelin AST -> DataFusion LogicalPlan  (typecheck + translate)
// ---------------------------------------------------------------------------

/// Type-check a parsed Hamelin AST and translate it to a DataFusion LogicalPlan.
///
/// The environment provider is derived from the `SessionContext`'s registered
/// table providers.
pub async fn type_check_and_translate(
    ast: impl Into<Arc<Query>>,
    ctx: &SessionContext,
) -> Result<TranslatedStatement, TranslationErrors> {
    let provider = catalog_provider_from_session(ctx).await?;
    type_check_and_translate_with_options(
        ast,
        ctx,
        TypeCheckOptions::builder().provider(provider).build(),
    )
    .await
}

/// Type-check a parsed Hamelin AST and translate it to a DataFusion LogicalPlan
/// with full configurable options.
pub async fn type_check_and_translate_with_options(
    ast: impl Into<Arc<Query>>,
    ctx: &SessionContext,
    opts: TypeCheckOptions,
) -> Result<TranslatedStatement, TranslationErrors> {
    let translate_opts = TranslateOptions::builder()
        .registry(opts.registry.clone())
        .provider(opts.provider.clone())
        .maybe_timestamp_field(opts.timestamp_field.clone())
        .maybe_message_field(opts.message_field.clone())
        .build();

    let typed = type_check_with_options(ast, opts).into_result()?;
    translate_with_options(Arc::new(typed), ctx, translate_opts).await
}

// ---------------------------------------------------------------------------
// Hamelin string -> DataFusion LogicalPlan  (parse + typecheck + translate)
// ---------------------------------------------------------------------------

/// Parse, type-check, and translate a Hamelin query string to a DataFusion
/// LogicalPlan in one step.
///
/// The environment provider is derived from the `SessionContext`'s registered
/// table providers.
pub async fn parse_and_translate(
    input: impl Into<String>,
    ctx: &SessionContext,
) -> Result<TranslatedStatement, TranslationErrors> {
    let provider = catalog_provider_from_session(ctx).await?;
    parse_and_translate_with_options(
        input,
        ctx,
        TypeCheckOptions::builder().provider(provider).build(),
    )
    .await
}

/// Parse, type-check, and translate a Hamelin query string to a DataFusion
/// LogicalPlan with an injected time range expression.
///
/// The environment provider is derived from the `SessionContext`'s registered
/// table providers.
pub async fn parse_and_translate_with_time_range(
    input: impl Into<String>,
    ctx: &SessionContext,
    time_range: Option<Arc<Expression>>,
) -> Result<TranslatedStatement, TranslationErrors> {
    let provider = catalog_provider_from_session(ctx).await?;
    parse_and_translate_with_options(
        input,
        ctx,
        TypeCheckOptions::builder()
            .provider(provider)
            .maybe_time_range(time_range)
            .build(),
    )
    .await
}

/// Parse, type-check, and translate a Hamelin query string to a DataFusion
/// LogicalPlan with full configurable options.
pub async fn parse_and_translate_with_options(
    input: impl Into<String>,
    ctx: &SessionContext,
    opts: TypeCheckOptions,
) -> Result<TranslatedStatement, TranslationErrors> {
    let translate_opts = TranslateOptions::builder()
        .registry(opts.registry.clone())
        .provider(opts.provider.clone())
        .maybe_timestamp_field(opts.timestamp_field.clone())
        .maybe_message_field(opts.message_field.clone())
        .build();

    let typed = parse_and_typecheck_with_options(input, opts).into_result()?;
    translate_with_options(Arc::new(typed), ctx, translate_opts).await
}