hamelin_datafusion 0.6.15

Translate Hamelin TypedAST to DataFusion LogicalPlans
Documentation
//! Pipeline translation from Hamelin IR to DataFusion LogicalPlan
//!
//! This module translates a lowered Hamelin query pipeline into a DataFusion LogicalPlan
//! by iterating through each command and chaining them together.

use std::collections::HashMap;
use std::sync::Arc;

use datafusion::common::DFSchema;
use datafusion::logical_expr::{EmptyRelation, LogicalPlan};
use datafusion::prelude::SessionContext;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::ast::identifier::Identifier;
use hamelin_translation::{IRCommand, IRCommandKind, IRPipeline};

use crate::command::{
    translate_agg_command, translate_explode_command, translate_from_command,
    translate_join_command, translate_limit_command, translate_select_command,
    translate_sort_command, translate_where_command, translate_window_command,
};
use crate::expr::ExprTranslationContext;

/// Create an empty relation that produces a single row with no columns.
/// This serves as the base plan for commands that generate data (like ROWS).
fn empty_single_row_plan() -> LogicalPlan {
    LogicalPlan::EmptyRelation(EmptyRelation {
        produce_one_row: true,
        schema: Arc::new(DFSchema::empty()),
    })
}

/// Translate an IRPipeline into a DataFusion LogicalPlan, with CTE plans available.
///
/// This is the internal implementation that supports CTEs. When a FROM clause references
/// a table name that exists in `ctes`, the CTE's plan is used directly instead of
/// looking up a table source.
pub async fn translate_pipeline_with_ctes(
    pipeline: &IRPipeline,
    ctx: &SessionContext,
    ctes: &HashMap<Identifier, Arc<LogicalPlan>>,
    expr_ctx: &ExprTranslationContext,
) -> Result<LogicalPlan, Arc<TranslationError>> {
    let commands = &pipeline.commands;

    if commands.is_empty() {
        return Err(TranslationError::msg(pipeline, "Empty pipeline produces no plan").into());
    }

    // Start with an empty single-row relation - FROM will replace it if present
    let mut current_plan = empty_single_row_plan();

    for command in commands {
        current_plan = translate_command(command, current_plan, ctx, ctes, expr_ctx).await?;
    }

    Ok(current_plan)
}

/// Translate a single IRCommand, given the current input plan.
/// FROM replaces the input plan entirely; other commands transform it.
async fn translate_command(
    command: &IRCommand,
    input: LogicalPlan,
    ctx: &SessionContext,
    ctes: &HashMap<Identifier, Arc<LogicalPlan>>,
    expr_ctx: &ExprTranslationContext,
) -> Result<LogicalPlan, Arc<TranslationError>> {
    match &command.kind {
        IRCommandKind::From(from_cmd) => {
            // FROM replaces the input plan with table scans or CTEs
            translate_from_command(
                from_cmd,
                ctx,
                ctes,
                &command.output_schema,
                command,
                expr_ctx,
            )
            .await
        }

        IRCommandKind::Where(where_cmd) => {
            translate_where_command(where_cmd, input, command, expr_ctx)
        }

        IRCommandKind::Select(select_cmd) => {
            translate_select_command(select_cmd, input, command, expr_ctx)
        }

        IRCommandKind::Agg(agg_cmd) => translate_agg_command(agg_cmd, input, command, expr_ctx),

        IRCommandKind::Window(window_cmd) => {
            translate_window_command(window_cmd, input, &command.output_schema, command, expr_ctx)
        }

        IRCommandKind::Sort(sort_cmd) => translate_sort_command(sort_cmd, input, command, expr_ctx),

        IRCommandKind::Limit(limit_cmd) => {
            translate_limit_command(limit_cmd, input, command, expr_ctx)
        }

        IRCommandKind::Explode(explode_cmd) => translate_explode_command(
            explode_cmd,
            input,
            &command.output_schema,
            command,
            expr_ctx,
        ),

        IRCommandKind::Join(join_cmd) => {
            translate_join_command(join_cmd, input, ctx, ctes, command, expr_ctx).await
        }
    }
}