mod compound_lowering;
mod expand_struct;
mod pipeline_normalizers;
mod statement_normalizers;
use std::sync::Arc;
use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::builder::query;
use hamelin_lib::tree::options::TypeCheckOptions;
use hamelin_lib::tree::typed_ast::context::StatementTranslationContext;
use hamelin_lib::tree::typed_ast::pipeline::TypedPipeline;
use hamelin_lib::tree::typed_ast::query::TypedStatement;
use hamelin_lib::type_check_with_options;
use pipeline_normalizers::{
dedup_append_source, desugar_in_array_literals, expand_array_literals,
extract_window_aggregates, fuse_projections, lower_broadcast_apply, lower_nest, lower_parse,
lower_transform, lower_ts_trunc_interval, lower_unnest, normalize_agg, normalize_explode,
normalize_window, normalize_within,
};
use statement_normalizers::{expand_union_schemas, from_to_union, lower_match, nest_from_aliases};
use pipeline_normalizers::align_append_schema;
pub fn normalize_statement(
statement: Arc<TypedStatement>,
ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedStatement>, Arc<TranslationError>> {
let statement = if ctx.skip_statement_passes {
statement
} else {
let statement = lower_match(statement, ctx)?;
let statement = nest_from_aliases(statement, ctx)?;
let statement = from_to_union(statement, ctx)?;
expand_union_schemas(statement, ctx)?
};
let mut builder = query();
for with_clause in &statement.with_clauses {
let name = with_clause.name.valid_ref()?;
let normalized = normalize_pipeline(with_clause.pipeline.clone(), ctx)?;
ctx.register_cte(name.clone(), normalized.environment());
builder = builder.with(name.clone(), normalized);
}
let normalized_main = normalize_pipeline(statement.pipeline.clone(), ctx)?;
let built = builder.main(normalized_main).build();
let result = Arc::new(
type_check_with_options(
built,
TypeCheckOptions::builder()
.registry(ctx.registry.clone())
.provider(ctx.provider.clone())
.build(),
)
.into_result()
.map_err(|errors| {
Arc::new(errors.into_iter().next().unwrap_or_else(|| {
TranslationError::msg(
statement.ast.as_ref(),
"type_check_with_options returned Err with no errors",
)
}))
})?,
);
Ok(result)
}
pub fn normalize_pipeline(
pipeline: Arc<TypedPipeline>,
ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedPipeline>, Arc<TranslationError>> {
if ctx.skip_pipeline_passes {
return Ok(pipeline);
}
let pipeline = lower_broadcast_apply(pipeline, ctx)?;
let pipeline = normalize_within(pipeline, ctx)?;
let pipeline = normalize_agg(pipeline, ctx)?;
let pipeline = normalize_window(pipeline, ctx)?;
let pipeline = extract_window_aggregates(pipeline, ctx)?;
let pipeline = normalize_explode(pipeline, ctx)?;
let pipeline = lower_unnest(pipeline, ctx)?;
let pipeline = lower_parse(pipeline, ctx)?;
let pipeline = lower_nest(pipeline, ctx)?;
let pipeline = expand_array_literals(pipeline, ctx)?;
let pipeline = desugar_in_array_literals(pipeline, ctx)?;
let pipeline = lower_ts_trunc_interval(pipeline, ctx)?;
let pipeline = dedup_append_source(pipeline, ctx)?;
let pipeline = align_append_schema(pipeline, ctx)?;
let pipeline = if ctx.lower_transform {
let pipeline = lower_transform(pipeline, ctx)?;
normalize_explode(pipeline, ctx)?
} else {
pipeline
};
if ctx.skip_projection_fusion {
return Ok(pipeline);
}
let pipeline = fuse_projections(pipeline, ctx)?;
Ok(pipeline)
}