mod compound_lowering;
mod expand_struct;
mod pipeline_normalizers;
mod statement_normalizers;
mod unique;
use std::rc::Rc;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::ast::{IntoTyped, TypeCheckExecutor};
use hamelin_lib::tree::builder::query;
use hamelin_lib::tree::typed_ast::context::StatementTranslationContext;
use hamelin_lib::tree::typed_ast::pipeline::TypedPipeline;
use hamelin_lib::tree::typed_ast::query::TypedStatement;
use pipeline_normalizers::{
expand_array_literals, extract_window_aggregates, fuse_projections, lower_nest, lower_parse,
lower_unnest, normalize_agg, normalize_explode, normalize_window, normalize_within,
};
use statement_normalizers::{
expand_union_schemas, from_to_union, lower_joins, lower_match, nest_from_aliases,
};
use pipeline_normalizers::align_append_schema;
pub fn normalize_statement(
statement: Rc<TypedStatement>,
ctx: &mut StatementTranslationContext,
) -> Result<Rc<TypedStatement>, Rc<TranslationError>> {
let statement = lower_match(statement, ctx)?;
let statement = lower_joins(statement, ctx)?;
let statement = nest_from_aliases(statement, ctx)?;
let statement = from_to_union(statement, ctx)?;
let statement = expand_union_schemas(statement, ctx)?;
let mut builder = query();
for with_clause in &statement.with_clauses {
let name = with_clause.name.valid_ref()?;
let normalized = normalize_pipeline(with_clause.pipeline.clone(), ctx)?;
ctx.register_cte(name.clone(), normalized.environment());
builder = builder.with(name.clone(), normalized);
}
let normalized_main = normalize_pipeline(statement.pipeline.clone(), ctx)?;
let normalized_main = align_append_schema(normalized_main, ctx)?;
Ok(Rc::new(
builder
.main(normalized_main)
.build()
.typed_with()
.with_registry(ctx.registry.clone())
.with_provider(ctx.provider.clone())
.typed(),
))
}
fn normalize_pipeline(
pipeline: Rc<TypedPipeline>,
ctx: &mut StatementTranslationContext,
) -> Result<Rc<TypedPipeline>, Rc<TranslationError>> {
let pipeline = normalize_within(pipeline, ctx)?;
let pipeline = normalize_agg(pipeline, ctx)?;
let pipeline = normalize_window(pipeline, ctx)?;
let pipeline = extract_window_aggregates(pipeline, ctx)?;
let pipeline = normalize_explode(pipeline, ctx)?;
let pipeline = lower_unnest(pipeline, ctx)?;
let pipeline = lower_parse(pipeline, ctx)?;
let pipeline = lower_nest(pipeline, ctx)?;
let pipeline = expand_array_literals(pipeline, ctx)?;
let pipeline = fuse_projections(pipeline, ctx)?;
Ok(pipeline)
}