hamelin_translation 0.6.12

Lowering and IR for Hamelin query language
Documentation
//! Normalization passes for typed AST.
//!
//! Normalization transforms typed AST into a more uniform form for translation to SQL.
//! Each pass transforms AST and re-typechecks to maintain type safety.
//!
//! ## Pass Types
//!
//! **Statement normalizers** (`statement_normalizers/`):
//! Transform a full `TypedStatement`, may generate new CTEs.
//! - `lower_match` - Lowers MATCH to FROM + LET + WHERE + WINDOW + WHERE + DROP (must run first)
//! - `nest_from_aliases` - Converts aliased FROM to CTEs with NEST for alias nesting
//! - `from_to_union` - Converts multi-source FROM to UNION
//! - `expand_union_schemas` - Generates CTEs for UNION with differing schemas
//!
//! **Pipeline normalizers** (`pipeline_normalizers/`):
//! Transform a single pipeline without generating CTEs.
//! - `lower_broadcast_apply` - BroadcastApply → transform(array, lambda)
//! - `normalize_within` - WITHIN → WHERE with explicit timestamp bounds
//! - `normalize_agg` - AGG compound identifiers → flat AGG + LET/DROP
//! - `normalize_window` - WINDOW compound identifiers → flat WINDOW + LET/DROP
//! - `normalize_explode` - EXPLODE compound identifiers → flat EXPLODE + LET/DROP
//! - `lower_unnest` - UNNEST → EXPLODE (if array) + LET + DROP
//! - `lower_parse` - PARSE → LET + WHERE (regex extraction + filter)
//! - `lower_nest` - NEST → SELECT with compound identifiers (struct packing)
//! - `expand_array_literals` - Expands array literal elements to match element type
//! - `desugar_in_array_literals` - Rewrites `x IN [1,2,3]` to `x IN (1,2,3)` when all elements are literals
//! - `align_append_schema` - Inserts SELECT before APPEND to align pipeline to target table schema
//! - `lower_transform` - transform(arr, lambda) → EXPLODE + AGG (conditional, for DataFusion)
//! - `fuse_projections` - Fuses LET/DROP/SELECT into minimal SELECT commands (must be last)
//!
//! ## Helpers
//!
//! - `compound_lowering` - Shared logic for AGG/WINDOW compound identifier lowering
//! - `expand_struct` - Struct widening for array literals and FROM schema expansion
//! - `unique` - Unique name generation for synthesized identifiers
//!
//! ## Pass Order
//!
//! 1. Statement normalizers (can generate CTEs)
//! 2. Pipeline normalizers (`fuse_projections` must be last to catch all LET/DROP)
//!
//! ## JOIN/LOOKUP Lowering
//!
//! JOIN/LOOKUP right-side hoisting is done during IR lowering (`ir.rs`), not
//! during normalization. This avoids double-nesting that occurred when the
//! normalization pass emitted NEST and then re-typechecked the JOIN.

mod compound_lowering;
mod expand_struct;
mod pipeline_normalizers;
mod statement_normalizers;

use std::sync::Arc;

use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::builder::query;
use hamelin_lib::tree::options::TypeCheckOptions;
use hamelin_lib::tree::typed_ast::context::StatementTranslationContext;
use hamelin_lib::tree::typed_ast::pipeline::TypedPipeline;
use hamelin_lib::tree::typed_ast::query::TypedStatement;
use hamelin_lib::type_check_with_options;

use pipeline_normalizers::{
    desugar_in_array_literals, expand_array_literals, extract_window_aggregates, fuse_projections,
    lower_broadcast_apply, lower_nest, lower_parse, lower_transform, lower_ts_trunc_interval,
    lower_unnest, normalize_agg, normalize_explode, normalize_window, normalize_within,
};
use statement_normalizers::{expand_union_schemas, from_to_union, lower_match, nest_from_aliases};

use pipeline_normalizers::align_append_schema;

/// Normalize a full statement through all passes.
///
/// This is the top-level entry point for normalization. It applies passes in order:
/// 1. Statement normalizers: `lower_match`, `nest_from_aliases`, `from_to_union`, `expand_union_schemas`
/// 2. Pipeline normalizers (for each CTE + main pipeline)
pub fn normalize_statement(
    statement: Arc<TypedStatement>,
    ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedStatement>, Arc<TranslationError>> {
    // Statement-level passes (can generate CTEs)
    let statement = if ctx.skip_statement_passes {
        statement
    } else {
        // lower_match must run first since it generates FROM with aliases
        let statement = lower_match(statement, ctx)?;
        let statement = nest_from_aliases(statement, ctx)?;
        let statement = from_to_union(statement, ctx)?;
        expand_union_schemas(statement, ctx)?
    };

    // Apply pipeline passes to each CTE + main pipeline
    let mut builder = query();

    for with_clause in &statement.with_clauses {
        let name = with_clause.name.valid_ref()?;
        let normalized = normalize_pipeline(with_clause.pipeline.clone(), ctx)?;
        ctx.register_cte(name.clone(), normalized.environment());
        builder = builder.with(name.clone(), normalized);
    }

    let normalized_main = normalize_pipeline(statement.pipeline.clone(), ctx)?;
    let built = builder.main(normalized_main).build();

    let result = Arc::new(
        type_check_with_options(
            built,
            TypeCheckOptions::builder()
                .registry(ctx.registry.clone())
                .provider(ctx.provider.clone())
                .build(),
        )
        .into_result()
        .map_err(|errors| {
            Arc::new(errors.into_iter().next().unwrap_or_else(|| {
                TranslationError::msg(
                    statement.ast.as_ref(),
                    "type_check_with_options returned Err with no errors",
                )
            }))
        })?,
    );

    Ok(result)
}

/// Normalize a single pipeline through all pipeline passes.
///
/// Pipeline-level pass contract: `Arc<TypedPipeline> -> Result<Arc<TypedPipeline>, ...>`
pub fn normalize_pipeline(
    pipeline: Arc<TypedPipeline>,
    ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedPipeline>, Arc<TranslationError>> {
    if ctx.skip_pipeline_passes {
        return Ok(pipeline);
    }

    let pipeline = lower_broadcast_apply(pipeline, ctx)?;
    let pipeline = normalize_within(pipeline, ctx)?;
    let pipeline = normalize_agg(pipeline, ctx)?;
    let pipeline = normalize_window(pipeline, ctx)?;
    let pipeline = extract_window_aggregates(pipeline, ctx)?;
    let pipeline = normalize_explode(pipeline, ctx)?;
    let pipeline = lower_unnest(pipeline, ctx)?;
    let pipeline = lower_parse(pipeline, ctx)?;
    let pipeline = lower_nest(pipeline, ctx)?;
    // expand_array_literals can generate transform() calls for struct widening
    // so it must run BEFORE lower_transform
    let pipeline = expand_array_literals(pipeline, ctx)?;
    // Desugar IN/NOT IN with array literals to tuple form for better SQL optimization
    let pipeline = desugar_in_array_literals(pipeline, ctx)?;
    // Rewrite TsTrunc(interval) → TsTrunc(now() + interval) so backends always see timestamps
    let pipeline = lower_ts_trunc_interval(pipeline, ctx)?;
    // align_append_schema can generate transform() calls for struct widening
    // so it must run BEFORE lower_transform
    let pipeline = align_append_schema(pipeline, ctx)?;
    // Conditionally lower transform() for backends that don't support lambdas
    // Must run AFTER expand_array_literals and align_append_schema (which can generate transform calls)
    // and BEFORE fuse_projections
    let pipeline = if ctx.lower_transform {
        let pipeline = lower_transform(pipeline, ctx)?;
        // Re-run normalize_explode since lower_transform generates EXPLODE commands
        normalize_explode(pipeline, ctx)?
    } else {
        pipeline
    };

    if ctx.skip_projection_fusion {
        return Ok(pipeline);
    }

    let pipeline = fuse_projections(pipeline, ctx)?;
    Ok(pipeline)
}