hamelin_datafusion 0.7.4

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Statement translation from Hamelin IR to DataFusion LogicalPlan
//!
//! This module translates a lowered Hamelin statement (including CTEs) into a
//! DataFusion LogicalPlan.

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::logical_expr::builder::subquery_alias;
use datafusion::logical_expr::LogicalPlan;
use datafusion::prelude::SessionContext;
use hamelin_lib::catalog::Column;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::ast::identifier::Identifier;
use hamelin_translation::{IRSideEffect, IRStatement};

use crate::expr::ExprTranslationContext;
use crate::pipeline::translate_pipeline_with_ctes;

/// Result of translating a statement
pub enum TranslatedStatement {
    /// A query that returns data, with its Hamelin output schema
    Query {
        plan: LogicalPlan,
        /// The Hamelin type schema (column names and types from the typed AST)
        output_schema: Vec<Column>,
    },
    /// A DML statement that inserts data into a target table
    Dml {
        /// The source data plan (everything before APPEND)
        source_plan: LogicalPlan,
        /// The target table identifier
        target_table: Identifier,
        /// Columns for deduplication (empty = plain INSERT, non-empty = anti-join + INSERT)
        distinct_by: Vec<Identifier>,
    },
}

impl TranslatedStatement {
    pub fn query(self) -> Option<(LogicalPlan, Vec<Column>)> {
        if let Self::Query {
            plan,
            output_schema,
        } = self
        {
            Some((plan, output_schema))
        } else {
            None
        }
    }

    pub fn dml(self) -> Option<(LogicalPlan, Identifier, Vec<Identifier>)> {
        if let Self::Dml {
            source_plan,
            target_table,
            distinct_by,
        } = self
        {
            Some((source_plan, target_table, distinct_by))
        } else {
            None
        }
    }
}

/// Translate an IRStatement into a DataFusion LogicalPlan
///
/// The statement must have been lowered via `hamelin_translation::lower()` before
/// calling this function. Lowering ensures:
/// - Compound identifiers in AGG/WINDOW are flattened to simple identifiers
/// - LET/DROP commands are fused into SELECT commands
/// - WITHIN is converted to WHERE
/// - PARSE is converted to LET + WHERE
/// - UNNEST (struct) is converted to SELECT
///
/// This function:
/// 1. Translates each CTE pipeline to a LogicalPlan wrapped in SubqueryAlias
/// 2. Passes the CTE plans to the main pipeline translation
/// 3. When a FROM clause references a CTE, the CTE plan is used directly
///
/// Note: `transform()` calls must be lowered before calling this function.
/// Use `hamelin_translation::normalize_with().with_lower_transform()` to enable
/// transform lowering during IR lowering. DataFusionProvidedExecutor does this automatically.
///
/// Tables are resolved from the `SessionContext`'s registered table providers.
/// For statements that don't use FROM (e.g., ROWS-based queries), the context
/// doesn't need any tables registered.
pub async fn translate_statement(
    statement: &IRStatement,
    ctx: &SessionContext,
) -> Result<TranslatedStatement, Arc<TranslationError>> {
    // Create the expression translation context once for the entire statement
    let expr_ctx = ExprTranslationContext::default();

    // First, translate all CTEs
    // Note: CTEs can reference earlier CTEs, so we translate in order and accumulate
    let mut cte_plans: HashMap<Identifier, Arc<LogicalPlan>> = HashMap::new();

    for with_clause in &statement.with_clauses {
        let cte_name = with_clause.name.as_str().to_string();
        let cte_ident: Identifier = with_clause.name.clone().into();

        // Translate the CTE pipeline, passing previously translated CTEs
        // This allows later CTEs to reference earlier ones
        let cte_plan =
            translate_pipeline_with_ctes(&with_clause.pipeline, ctx, &cte_plans, &expr_ctx).await?;

        // Wrap in SubqueryAlias so it has the correct name in the schema
        let aliased_plan = subquery_alias(cte_plan, &cte_name)
            .map_err(|e| Arc::new(TranslationError::wrap(with_clause, e)))?;

        cte_plans.insert(cte_ident, Arc::new(aliased_plan));
    }

    // Translate the main pipeline (APPEND is no longer in the command list)
    let plan =
        translate_pipeline_with_ctes(&statement.pipeline, ctx, &cte_plans, &expr_ctx).await?;

    // Check if this is a DML statement
    if let IRSideEffect::Append { table, distinct_by } = &statement.side_effect {
        return Ok(TranslatedStatement::Dml {
            source_plan: plan,
            target_table: table.clone(),
            distinct_by: distinct_by.clone(),
        });
    }

    let output_schema: Vec<Column> = statement.pipeline.output_schema.as_struct().into();

    Ok(TranslatedStatement::Query {
        plan,
        output_schema,
    })
}