hamelin_translation 0.4.2

Lowering and IR for Hamelin query language
Documentation
//! Normalization passes for typed AST.
//!
//! Normalization transforms typed AST into a more uniform form for translation to SQL.
//! Each pass transforms AST and re-typechecks to maintain type safety.
//!
//! ## Pass Types
//!
//! **Statement normalizers** (`statement_normalizers/`):
//! Transform a full `TypedStatement`, may generate new CTEs.
//! - `lower_match` - Lowers MATCH to FROM + LET + WHERE + WINDOW + WHERE + DROP (must run first)
//! - `lower_joins` - Hoists JOIN/LOOKUP right sides to CTEs with NEST (must run before nest_from_aliases)
//! - `nest_from_aliases` - Converts aliased FROM to CTEs with NEST for alias nesting
//! - `from_to_union` - Converts multi-source FROM to UNION
//! - `expand_union_schemas` - Generates CTEs for UNION with differing schemas
//! - `align_append_schema` - Inserts SELECT before APPEND to align pipeline to target table schema
//!
//! **Pipeline normalizers** (`pipeline_normalizers/`):
//! Transform a single pipeline without generating CTEs.
//! - `lower_broadcast_apply` - BroadcastApply → transform(array, lambda)
//! - `normalize_within` - WITHIN → WHERE with explicit timestamp bounds
//! - `normalize_agg` - AGG compound identifiers → flat AGG + LET/DROP
//! - `normalize_window` - WINDOW compound identifiers → flat WINDOW + LET/DROP
//! - `normalize_explode` - EXPLODE compound identifiers → flat EXPLODE + LET/DROP
//! - `lower_unnest` - UNNEST → EXPLODE (if array) + LET + DROP
//! - `lower_parse` - PARSE → LET + WHERE (regex extraction + filter)
//! - `lower_nest` - NEST → SELECT with compound identifiers (struct packing)
//! - `expand_array_literals` - Expands array literal elements to match element type
//! - `fuse_projections` - Fuses LET/DROP/SELECT into minimal SELECT commands (must be last)
//!
//! ## Helpers
//!
//! - `compound_lowering` - Shared logic for AGG/WINDOW compound identifier lowering
//! - `expand_struct` - Struct widening for array literals and FROM schema expansion
//! - `unique` - Unique name generation for synthesized identifiers
//!
//! ## Pass Order
//!
//! 1. Statement normalizers (can generate CTEs)
//! 2. Pipeline normalizers (`fuse_projections` must be last to catch all LET/DROP)

mod compound_lowering;
mod expand_struct;
mod pipeline_normalizers;
mod statement_normalizers;
mod unique;

use std::sync::Arc;

use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::ast::{IntoTyped, TypeCheckExecutor};
use hamelin_lib::tree::builder::query;
use hamelin_lib::tree::typed_ast::context::StatementTranslationContext;
use hamelin_lib::tree::typed_ast::pipeline::TypedPipeline;
use hamelin_lib::tree::typed_ast::query::TypedStatement;

use pipeline_normalizers::{
    expand_array_literals, extract_window_aggregates, fuse_projections, lower_broadcast_apply,
    lower_nest, lower_parse, lower_unnest, normalize_agg, normalize_explode, normalize_window,
    normalize_within,
};
use statement_normalizers::{
    expand_union_schemas, from_to_union, lower_joins, lower_match, nest_from_aliases,
};

use pipeline_normalizers::align_append_schema;

/// Normalize a full statement through all passes.
///
/// This is the top-level entry point for normalization. It applies passes in order:
/// 1. Statement normalizers: `lower_match`, `lower_joins`, `nest_from_aliases`, `from_to_union`, `expand_union_schemas`
/// 2. Pipeline normalizers (for each CTE + main pipeline)
pub fn normalize_statement(
    statement: Arc<TypedStatement>,
    ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedStatement>, Arc<TranslationError>> {
    // Statement-level passes (can generate CTEs)
    // lower_match must run first since it generates FROM with aliases
    let statement = lower_match(statement, ctx)?;
    // lower_joins must run before nest_from_aliases since both emit NEST
    let statement = lower_joins(statement, ctx)?;
    let statement = nest_from_aliases(statement, ctx)?;
    let statement = from_to_union(statement, ctx)?;
    let statement = expand_union_schemas(statement, ctx)?;

    // Apply pipeline passes to each CTE + main pipeline
    let mut builder = query();

    for with_clause in &statement.with_clauses {
        let name = with_clause.name.valid_ref()?;
        let normalized = normalize_pipeline(with_clause.pipeline.clone(), ctx)?;
        ctx.register_cte(name.clone(), normalized.environment());
        builder = builder.with(name.clone(), normalized);
    }

    let normalized_main = normalize_pipeline(statement.pipeline.clone(), ctx)?;
    // align_append_schema runs only on main pipeline after all other normalizations
    let normalized_main = align_append_schema(normalized_main, ctx)?;

    Ok(Arc::new(
        builder
            .main(normalized_main)
            .build()
            .typed_with()
            .with_registry(ctx.registry.clone())
            .with_provider(ctx.provider.clone())
            .typed(),
    ))
}

/// Normalize a single pipeline through all pipeline passes.
///
/// Pipeline-level pass contract: `Arc<TypedPipeline> -> Result<Arc<TypedPipeline>, ...>`
fn normalize_pipeline(
    pipeline: Arc<TypedPipeline>,
    ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedPipeline>, Arc<TranslationError>> {
    let pipeline = lower_broadcast_apply(pipeline, ctx)?;
    let pipeline = normalize_within(pipeline, ctx)?;
    let pipeline = normalize_agg(pipeline, ctx)?;
    let pipeline = normalize_window(pipeline, ctx)?;
    let pipeline = extract_window_aggregates(pipeline, ctx)?;
    let pipeline = normalize_explode(pipeline, ctx)?;
    let pipeline = lower_unnest(pipeline, ctx)?;
    let pipeline = lower_parse(pipeline, ctx)?;
    let pipeline = lower_nest(pipeline, ctx)?;
    let pipeline = expand_array_literals(pipeline, ctx)?;
    let pipeline = fuse_projections(pipeline, ctx)?;
    Ok(pipeline)
}