hamelin_datafusion 0.7.6

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Statement translation from Hamelin IR to DataFusion LogicalPlan
//!
//! This module translates a lowered Hamelin statement (including CTEs) into a
//! DataFusion LogicalPlan.

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::common::TableReference;
use datafusion::datasource::DefaultTableSource;
use datafusion::logical_expr::builder::subquery_alias;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::{ident, Expr, LogicalPlan, LogicalPlanBuilder};
use datafusion::prelude::SessionContext;
use datafusion_functions::core::expr_fn as core_fn;
use hamelin_executor::executor::ExecutorError;
use hamelin_lib::catalog::Column;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::ast::identifier::{Identifier, SimpleIdentifier};
use hamelin_translation::{IRSideEffect, IRStatement};

use crate::expr::ExprTranslationContext;
use crate::pipeline::translate_pipeline_with_ctes;

/// Result of translating a statement
pub enum TranslatedStatement {
    /// A query that returns data, with its Hamelin output schema
    Query {
        plan: LogicalPlan,
        /// The Hamelin type schema (column names and types from the typed AST)
        output_schema: Vec<Column>,
    },
    /// A DML statement that inserts data into a target table
    Dml {
        /// The source data plan (everything before APPEND)
        source_plan: LogicalPlan,
        /// The target table identifier
        target_table: Identifier,
        /// Columns for deduplication (empty = plain INSERT, non-empty = anti-join + INSERT)
        distinct_by: Vec<Identifier>,
    },
}

impl TranslatedStatement {
    pub fn query(self) -> Option<(LogicalPlan, Vec<Column>)> {
        if let Self::Query {
            plan,
            output_schema,
        } = self
        {
            Some((plan, output_schema))
        } else {
            None
        }
    }

    pub fn dml(self) -> Option<(LogicalPlan, Identifier, Vec<Identifier>)> {
        if let Self::Dml {
            source_plan,
            target_table,
            distinct_by,
        } = self
        {
            Some((source_plan, target_table, distinct_by))
        } else {
            None
        }
    }

    /// Build the full INSERT logical plan for a DML statement, including the
    /// anti-join for DISTINCT BY. Returns an error if called on a Query.
    pub async fn into_dml_plan(self, ctx: &SessionContext) -> Result<LogicalPlan, ExecutorError> {
        let (source_plan, target_table, distinct_by) = self.dml().ok_or_else(|| {
            ExecutorError::QueryError(anyhow::anyhow!("expected DML statement, got query").into())
        })?;

        let segments = target_table.segments();
        let table_name = segments
            .iter()
            .map(|s| s.as_str())
            .collect::<Vec<_>>()
            .join(".");
        let table_ref = match segments {
            [name] => TableReference::bare(name.as_str()),
            [schema, name] => TableReference::partial(schema.as_str(), name.as_str()),
            [catalog, schema, name] => {
                TableReference::full(catalog.as_str(), schema.as_str(), name.as_str())
            }
            _ => {
                return Err(ExecutorError::QueryError(
                    anyhow::anyhow!("Invalid DML target table identifier").into(),
                ))
            }
        };

        let table_provider = ctx.table_provider(table_ref).await.map_err(|e| {
            ExecutorError::QueryError(
                anyhow::anyhow!("Target table '{}' not found: {}", table_name, e).into(),
            )
        })?;

        let effective_source = if distinct_by.is_empty() {
            source_plan
        } else {
            let target_scan = LogicalPlanBuilder::scan(
                table_name.clone(),
                Arc::new(DefaultTableSource::new(table_provider.clone())),
                None,
            )
            .map_err(|e| {
                ExecutorError::QueryError(
                    anyhow::anyhow!("Failed to scan target table for DISTINCT BY: {}", e).into(),
                )
            })?
            .build()
            .map_err(|e| {
                ExecutorError::QueryError(
                    anyhow::anyhow!("Failed to build target scan: {}", e).into(),
                )
            })?;

            let build_field_expr = |segments: &[SimpleIdentifier]| -> Expr {
                let mut expr = ident(segments[0].as_str());
                for seg in &segments[1..] {
                    expr = core_fn::get_field(expr, seg.as_str());
                }
                expr
            };

            let left_keys: Vec<Expr> = distinct_by
                .iter()
                .map(|id| build_field_expr(id.segments()))
                .collect();
            let right_keys: Vec<Expr> = distinct_by
                .iter()
                .map(|id| build_field_expr(id.segments()))
                .collect();

            LogicalPlanBuilder::from(source_plan)
                .join_with_expr_keys(
                    target_scan,
                    datafusion::logical_expr::JoinType::LeftAnti,
                    (left_keys, right_keys),
                    None,
                )
                .map_err(|e| {
                    ExecutorError::QueryError(
                        anyhow::anyhow!("Failed to build anti-join for DISTINCT BY: {}", e).into(),
                    )
                })?
                .build()
                .map_err(|e| {
                    ExecutorError::QueryError(
                        anyhow::anyhow!("Failed to finalize anti-join plan: {}", e).into(),
                    )
                })?
        };

        LogicalPlanBuilder::insert_into(
            effective_source,
            &table_name,
            Arc::new(DefaultTableSource::new(table_provider)),
            InsertOp::Append,
        )
        .map_err(|e| {
            ExecutorError::QueryError(anyhow::anyhow!("Failed to build INSERT plan: {}", e).into())
        })?
        .build()
        .map_err(|e| {
            ExecutorError::QueryError(
                anyhow::anyhow!("Failed to finalize INSERT plan: {}", e).into(),
            )
        })
    }
}

/// Translate an IRStatement into a DataFusion LogicalPlan
///
/// The statement must have been lowered via `hamelin_translation::lower()` before
/// calling this function. Lowering ensures:
/// - Compound identifiers in AGG/WINDOW are flattened to simple identifiers
/// - LET/DROP commands are fused into SELECT commands
/// - WITHIN is converted to WHERE
/// - PARSE is converted to LET + WHERE
/// - UNNEST (struct) is converted to SELECT
///
/// This function:
/// 1. Translates each CTE pipeline to a LogicalPlan wrapped in SubqueryAlias
/// 2. Passes the CTE plans to the main pipeline translation
/// 3. When a FROM clause references a CTE, the CTE plan is used directly
///
/// Note: `transform()` calls must be lowered before calling this function.
/// Use `hamelin_translation::normalize_with().with_lower_transform()` to enable
/// transform lowering during IR lowering. DataFusionProvidedExecutor does this automatically.
///
/// Tables are resolved from the `SessionContext`'s registered table providers.
/// For statements that don't use FROM (e.g., ROWS-based queries), the context
/// doesn't need any tables registered.
pub async fn translate_statement(
    statement: &IRStatement,
    ctx: &SessionContext,
) -> Result<TranslatedStatement, Arc<TranslationError>> {
    // Create the expression translation context once for the entire statement
    let expr_ctx = ExprTranslationContext::default();

    // First, translate all CTEs
    // Note: CTEs can reference earlier CTEs, so we translate in order and accumulate
    let mut cte_plans: HashMap<Identifier, Arc<LogicalPlan>> = HashMap::new();

    for with_clause in &statement.with_clauses {
        let cte_name = with_clause.name.as_str().to_string();
        let cte_ident: Identifier = with_clause.name.clone().into();

        // Translate the CTE pipeline, passing previously translated CTEs
        // This allows later CTEs to reference earlier ones
        let cte_plan =
            translate_pipeline_with_ctes(&with_clause.pipeline, ctx, &cte_plans, &expr_ctx).await?;

        // Wrap in SubqueryAlias so it has the correct name in the schema
        let aliased_plan = subquery_alias(cte_plan, &cte_name)
            .map_err(|e| Arc::new(TranslationError::wrap(with_clause, e)))?;

        cte_plans.insert(cte_ident, Arc::new(aliased_plan));
    }

    // Translate the main pipeline (APPEND is no longer in the command list)
    let plan =
        translate_pipeline_with_ctes(&statement.pipeline, ctx, &cte_plans, &expr_ctx).await?;

    // Check if this is a DML statement
    if let IRSideEffect::Append { table, distinct_by } = &statement.side_effect {
        return Ok(TranslatedStatement::Dml {
            source_plan: plan,
            target_table: table.clone(),
            distinct_by: distinct_by.clone(),
        });
    }

    let output_schema: Vec<Column> = statement.pipeline.output_schema.as_struct().into();

    Ok(TranslatedStatement::Query {
        plan,
        output_schema,
    })
}