use std::collections::HashSet;
use std::sync::Arc;
use ordermap::OrderMap;
use hamelin_lib::{
err::TranslationError,
tree::{
ast::{
command::Command,
expression::{Expression, ExpressionKind},
identifier::{CompoundIdentifier, Identifier, SimpleIdentifier},
node::Span,
},
builder::{self, field, field_ref, select_command, ExpressionBuilder},
typed_ast::{
clause::Projections,
command::{
TypedCommand, TypedCommandKind, TypedDropCommand, TypedLetCommand,
TypedSelectCommand,
},
context::StatementTranslationContext,
environment::TypeEnvironment,
expression::{TypedExpression, TypedExpressionKind},
pipeline::TypedPipeline,
},
},
types::{struct_type::Struct, Type},
};
pub fn fuse_projections(
pipeline: Arc<TypedPipeline>,
ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedPipeline>, Arc<TranslationError>> {
let needs_fusion = pipeline
.valid_ref()?
.commands
.iter()
.any(command_needs_fusion);
if !needs_fusion {
return Ok(pipeline);
}
let valid = pipeline.valid_ref()?;
let fused_commands = fuse_commands(&valid.commands)?;
let mut pipe_builder = builder::pipeline().at(pipeline.ast.span.clone());
for cmd in fused_commands {
pipe_builder = pipe_builder.command(cmd);
}
let fused_ast = pipe_builder.build();
Ok(Arc::new(TypedPipeline::from_ast_with_context(
Arc::new(fused_ast),
ctx,
)))
}
fn command_needs_fusion(cmd: &Arc<TypedCommand>) -> bool {
matches!(
&cmd.kind,
TypedCommandKind::Let(_) | TypedCommandKind::Drop(_)
)
}
fn fuse_commands(commands: &[Arc<TypedCommand>]) -> Result<Vec<Command>, Arc<TranslationError>> {
let mut result: Vec<Command> = Vec::new();
let mut pending: Option<PendingSelect> = None;
for command in commands {
match &command.kind {
TypedCommandKind::Select(select_cmd) => {
if let Some(p) = pending.take() {
result.push(p.emit()?);
}
pending = Some(PendingSelect::from_select(command, select_cmd)?);
}
TypedCommandKind::Let(let_cmd) => {
let refs = extract_field_references_from_projections(&let_cmd.projections);
if let Some(ref mut p) = pending {
let expr_depends = refs
.iter()
.any(|r| p.assigned.iter().any(|a| identifiers_overlap(r, a)));
let target_depends = let_cmd.projections.assignments.iter().any(|assignment| {
assignment
.identifier
.valid_ref()
.is_ok_and(|target| has_non_literal_ancestor(&p.assignments, target))
});
if expr_depends || target_depends {
if let Some(prev) = pending.take() {
result.push(prev.emit()?);
}
pending = Some(PendingSelect::from_let(command, let_cmd)?);
} else {
p.merge_let(command, let_cmd)?;
}
} else {
pending = Some(PendingSelect::from_let(command, let_cmd)?);
}
}
TypedCommandKind::Drop(drop_cmd) => {
if let Some(ref mut p) = pending {
let drop_depends = drop_cmd
.dropped_fields
.iter()
.any(|dropped| has_non_literal_ancestor(&p.assignments, dropped));
if drop_depends {
if let Some(prev) = pending.take() {
result.push(prev.emit()?);
}
pending = Some(PendingSelect::from_drop(command, drop_cmd));
} else {
p.merge_drop(command, drop_cmd);
}
} else {
pending = Some(PendingSelect::from_drop(command, drop_cmd));
}
}
_ => {
if let Some(p) = pending.take() {
result.push(p.emit()?);
}
result.push(command.ast.as_ref().clone());
}
}
}
if let Some(p) = pending {
result.push(p.emit()?);
}
Ok(result)
}
struct PendingSelect {
assignments: OrderMap<Identifier, Arc<Expression>>,
assigned: HashSet<Identifier>,
dropped: HashSet<Identifier>,
output_schema: Arc<TypeEnvironment>,
span: Span,
}
impl PendingSelect {
fn from_select(
command: &TypedCommand,
select_cmd: &TypedSelectCommand,
) -> Result<Self, Arc<TranslationError>> {
let mut assignments = ordermap::OrderMap::new();
let mut assigned = HashSet::new();
for assignment in &select_cmd.projections.assignments {
let id = assignment.identifier.clone().valid()?;
assignments.insert(id.clone(), assignment.expression.ast.clone());
assigned.insert(id);
}
Ok(Self {
assignments,
assigned,
dropped: HashSet::new(),
output_schema: command.output_schema.clone(),
span: command.ast.span,
})
}
fn from_let(
command: &TypedCommand,
let_cmd: &TypedLetCommand,
) -> Result<Self, Arc<TranslationError>> {
let mut assignments = ordermap::OrderMap::new();
let mut assigned = HashSet::new();
for assignment in &let_cmd.projections.assignments {
let id = assignment.identifier.clone().valid()?;
assignments.insert(id.clone(), assignment.expression.ast.clone());
assigned.insert(id);
}
Ok(Self {
assignments,
assigned,
dropped: HashSet::new(),
output_schema: command.output_schema.clone(),
span: command.ast.span,
})
}
fn from_drop(command: &TypedCommand, drop_cmd: &TypedDropCommand) -> Self {
let dropped = drop_cmd.dropped_fields.iter().cloned().collect();
Self {
assignments: ordermap::OrderMap::new(),
assigned: HashSet::new(),
dropped,
output_schema: command.output_schema.clone(),
span: command.ast.span,
}
}
fn merge_let(
&mut self,
command: &TypedCommand,
let_cmd: &TypedLetCommand,
) -> Result<(), Arc<TranslationError>> {
let resolved: Vec<_> = let_cmd
.projections
.assignments
.iter()
.map(|a| a.identifier.clone().valid().map(|id| (id, a)))
.collect::<Result<_, _>>()?;
let mut folded_into_literal = HashSet::new();
for (id, assignment) in &resolved {
if let Some((ancestor_id, child_path)) =
find_struct_literal_ancestor(&self.assignments, id)
{
if let Some(ancestor_expr) = self.assignments.get_mut(&ancestor_id) {
let folded = modify_struct_literal_field(
ancestor_expr,
&child_path,
Some(assignment.expression.ast.clone()),
);
if folded {
folded_into_literal.insert(id.clone());
}
}
}
}
let mut new_assignments = OrderMap::new();
for (id, assignment) in &resolved {
if folded_into_literal.contains(id) {
continue;
}
new_assignments.insert(id.clone(), assignment.expression.ast.clone());
self.assigned.insert(id.clone());
}
for (id, expr) in self.assignments.drain(..) {
if new_assignments
.keys()
.any(|new_id| identifiers_overlap(&id, new_id))
{
continue;
}
new_assignments.insert(id, expr);
}
self.assignments = new_assignments;
self.output_schema = command.output_schema.clone();
self.expand_span(&command.ast.span);
Ok(())
}
fn merge_drop(&mut self, command: &TypedCommand, drop_cmd: &TypedDropCommand) {
for dropped in &drop_cmd.dropped_fields {
let pruned = find_struct_literal_ancestor(&self.assignments, dropped)
.and_then(|(ancestor_id, child_path)| {
let expr = self.assignments.get_mut(&ancestor_id)?;
modify_struct_literal_field(expr, &child_path, None).then_some(())
})
.is_some();
if !pruned {
self.assignments.remove(dropped);
self.assigned.remove(dropped);
self.dropped.insert(dropped.clone());
}
}
self.output_schema = command.output_schema.clone();
self.expand_span(&command.ast.span);
}
fn expand_span(&mut self, other: &Span) {
if other.is_none() {
return;
}
if self.span.is_none() {
self.span = other.clone();
return;
}
if let (Some(self_start), Some(self_end), Some(other_start), Some(other_end)) = (
self.span.start(),
self.span.end(),
other.start(),
other.end(),
) {
let new_start = self_start.min(other_start);
let new_end = self_end.max(other_end);
self.span = Span::new(new_start, new_end);
}
}
fn emit(self) -> Result<Command, Arc<TranslationError>> {
let mut builder = select_command();
let output_struct = self.output_schema.as_struct();
let assignment_set: HashSet<_> = self.assignments.keys().cloned().collect();
let mut fields_to_emit = Vec::new();
emit_fields_in_schema_order(
&output_struct,
&self.assignments,
&assignment_set,
&self.dropped,
&[],
&mut fields_to_emit,
)?;
for (identifier, expr) in fields_to_emit {
builder = builder.named_field(identifier, expr);
}
Ok(builder.at(self.span).build())
}
}
fn any_modification_overlaps_struct(
assignments: &HashSet<Identifier>,
dropped: &HashSet<Identifier>,
struct_id: &Identifier,
) -> bool {
let overlaps_assignments = assignments
.iter()
.any(|a| a == struct_id || a.has_prefix(struct_id) || struct_id.has_prefix(a));
let overlaps_dropped = dropped
.iter()
.any(|d| d == struct_id || d.has_prefix(struct_id) || struct_id.has_prefix(d));
overlaps_assignments || overlaps_dropped
}
fn emit_fields_in_schema_order(
schema: &Struct,
assignments: &OrderMap<Identifier, Arc<Expression>>,
assignment_set: &HashSet<Identifier>,
dropped: &HashSet<Identifier>,
prefix: &[SimpleIdentifier],
output: &mut Vec<(Identifier, Arc<Expression>)>,
) -> Result<(), Arc<TranslationError>> {
for (field_name, field_type) in schema.iter() {
let mut path = prefix.to_vec();
path.push(field_name.clone().into());
let full_id: Identifier = match path.as_slice() {
[] => {
return Err(TranslationError::fatal(
"fuse_projections",
"emit_fields_in_schema_order called with empty path".into(),
)
.into())
}
[single] => single.clone().into(),
[first, second, rest @ ..] => {
CompoundIdentifier::new(first.clone(), second.clone(), rest.to_vec()).into()
}
};
if let Some(expr) = assignments.get(&full_id) {
output.push((full_id, expr.clone()));
continue;
}
if let Type::Struct(inner_struct) = field_type {
if any_modification_overlaps_struct(assignment_set, dropped, &full_id) {
emit_fields_in_schema_order(
inner_struct,
assignments,
assignment_set,
dropped,
&path,
output,
)?;
} else {
let passthrough_expr = synthesize_passthrough_ast(&full_id);
output.push((full_id, passthrough_expr.into()));
}
continue;
}
let passthrough_expr = synthesize_passthrough_ast(&full_id);
output.push((full_id, passthrough_expr.into()));
}
Ok(())
}
fn synthesize_passthrough_ast(identifier: &Identifier) -> Expression {
match identifier {
Identifier::Simple(simple) => field_ref(simple.as_str()).build(),
Identifier::Compound(compound) => {
let mut current: Box<dyn ExpressionBuilder> =
Box::new(field_ref(compound.first().as_str()));
for part in compound.rest_parts() {
current = Box::new(field(current, part.as_str()));
}
current.build()
}
}
}
fn identifiers_overlap(a: &Identifier, b: &Identifier) -> bool {
a == b || a.has_prefix(b) || b.has_prefix(a)
}
fn has_non_literal_ancestor(
assignments: &OrderMap<Identifier, Arc<Expression>>,
target: &Identifier,
) -> bool {
let segments = target.segments();
for prefix in target.prefixes() {
if let Some(expr) = assignments.get(&prefix) {
let ExpressionKind::StructLiteral(ref lit) = expr.kind else {
return true;
};
let prefix_len = prefix.segments().len();
let remaining = &segments[prefix_len..];
let mut current = lit;
for segment in &remaining[..remaining.len().saturating_sub(1)] {
match current
.fields
.iter()
.find(|(id, _)| id.valid_ref().is_ok_and(|n| n == segment))
{
Some((_, child_expr)) => match &child_expr.kind {
ExpressionKind::StructLiteral(nested) => current = nested,
_ => return true,
},
None => return false, }
}
return false;
}
}
false
}
fn find_struct_literal_ancestor(
assignments: &OrderMap<Identifier, Arc<Expression>>,
target: &Identifier,
) -> Option<(Identifier, Vec<SimpleIdentifier>)> {
let segments = target.segments();
for prefix in target.prefixes() {
if let Some(expr) = assignments.get(&prefix) {
if matches!(expr.kind, ExpressionKind::StructLiteral(_)) {
let prefix_len = prefix.segments().len();
let child_path = segments[prefix_len..].to_vec();
return Some((prefix, child_path));
}
}
}
None
}
fn modify_struct_literal_field(
expr: &mut Arc<Expression>,
child_path: &[SimpleIdentifier],
new_value: Option<Arc<Expression>>,
) -> bool {
if child_path.is_empty() {
return false;
}
if !matches!(expr.kind, ExpressionKind::StructLiteral(_)) {
return false;
}
let owned = Arc::make_mut(expr);
let ExpressionKind::StructLiteral(ref mut lit) = owned.kind else {
return false;
};
let mut current = lit;
for segment in &child_path[..child_path.len() - 1] {
let Some((_, child_expr)) = current.find_field_mut(segment.as_str()) else {
return false;
};
let child = Arc::make_mut(child_expr);
let ExpressionKind::StructLiteral(ref mut nested) = child.kind else {
return false;
};
current = nested;
}
let leaf = child_path.last().expect("child_path is non-empty").as_str();
match new_value {
Some(value) => current.set_field(leaf, value),
None => current.remove_field(leaf),
}
true
}
fn extract_field_references_from_projections(projections: &Projections) -> HashSet<Identifier> {
let mut refs = HashSet::new();
for assignment in &projections.assignments {
extract_field_references_from_expression(&assignment.expression, &mut refs);
}
refs
}
fn extract_field_references_from_expression(
expr: &TypedExpression,
refs: &mut HashSet<Identifier>,
) {
expr.find(&mut |e| {
if let TypedExpressionKind::FieldReference(col_ref) = &e.kind {
if let Ok(simple) = col_ref.field_name.clone().valid() {
refs.insert(simple.into());
}
}
false });
}
#[cfg(test)]
mod tests {
use super::*;
use hamelin_lib::type_check;
use hamelin_lib::{
tree::{
ast::{identifier::CompoundIdentifier, pipeline::Pipeline},
builder::{
add, array, drop_command, field_ref, let_command, pipeline, select_command,
struct_literal,
},
},
types::{struct_type::Struct, INT},
};
use pretty_assertions::assert_eq;
use rstest::rstest;
use std::sync::Arc;
#[rstest]
#[case::no_fusion_needed(
pipeline()
.command(select_command().named_field("a", 1).named_field("b", 2).build())
.build(),
pipeline()
.command(select_command().named_field("a", 1).named_field("b", 2).build())
.build(),
Struct::default().with_str("a", INT).with_str("b", INT)
)]
#[case::select_let_fused(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("b", 2).build())
.build(),
pipeline()
.command(select_command().named_field("b", 2).named_field("a", 1).build())
.build(),
Struct::default().with_str("b", INT).with_str("a", INT)
)]
#[case::select_drop_fused(
pipeline()
.command(select_command().named_field("a", 1).named_field("b", 2).build())
.command(drop_command().field("b").build())
.build(),
pipeline()
.command(select_command().named_field("a", 1).build())
.build(),
Struct::default().with_str("a", INT)
)]
#[case::select_multiple_lets_fused(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("b", 2).build())
.command(let_command().named_field("c", 3).build())
.build(),
pipeline()
.command(select_command()
.named_field("c", 3)
.named_field("b", 2)
.named_field("a", 1)
.build())
.build(),
Struct::default().with_str("c", INT).with_str("b", INT).with_str("a", INT)
)]
#[case::barrier_let_refs_select_field(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("b", add(field_ref("a"), 1)).build())
.build(),
pipeline()
.command(select_command().named_field("a", 1).build())
.command(select_command()
.named_field("b", add(field_ref("a"), 1))
.named_field("a", field_ref("a"))
.build())
.build(),
Struct::default().with_str("b", INT).with_str("a", INT)
)]
#[case::barrier_let_refs_let_field(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("b", 2).build())
.command(let_command().named_field("c", add(field_ref("b"), 1)).build())
.build(),
pipeline()
.command(select_command()
.named_field("b", 2)
.named_field("a", 1)
.build())
.command(select_command()
.named_field("c", add(field_ref("b"), 1))
.named_field("b", field_ref("b"))
.named_field("a", field_ref("a"))
.build())
.build(),
Struct::default().with_str("c", INT).with_str("b", INT).with_str("a", INT)
)]
#[case::barrier_chained_dependencies(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("b", add(field_ref("a"), 1)).build())
.command(let_command().named_field("c", add(field_ref("b"), 1)).build())
.build(),
pipeline()
.command(select_command().named_field("a", 1).build())
.command(select_command()
.named_field("b", add(field_ref("a"), 1))
.named_field("a", field_ref("a"))
.build())
.command(select_command()
.named_field("c", add(field_ref("b"), 1))
.named_field("b", field_ref("b"))
.named_field("a", field_ref("a"))
.build())
.build(),
Struct::default().with_str("c", INT).with_str("b", INT).with_str("a", INT)
)]
#[case::no_barrier_overwrite_without_ref(
pipeline()
.command(let_command().named_field("a", 1).build())
.command(let_command().named_field("a", 2).build())
.build(),
pipeline()
// Second LET's assignment wins (last write)
.command(select_command().named_field("a", 2).build())
.build(),
Struct::default().with_str("a", INT)
)]
#[case::barrier_self_reference(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("a", add(field_ref("a"), 1)).build())
.build(),
pipeline()
.command(select_command().named_field("a", 1).build())
.command(select_command().named_field("a", add(field_ref("a"), 1)).build())
.build(),
Struct::default().with_str("a", INT)
)]
#[case::no_barrier_independent_lets(
pipeline()
.command(select_command().named_field("a", 1).build())
.command(let_command().named_field("b", 2).build())
.command(let_command().named_field("c", 3).build())
.build(),
pipeline()
.command(select_command()
.named_field("c", 3)
.named_field("b", 2)
.named_field("a", 1)
.build())
.build(),
Struct::default().with_str("c", INT).with_str("b", INT).with_str("a", INT)
)]
#[case::three_lets_prepend_order(
pipeline()
.command(let_command().named_field("a", 1).build())
.command(let_command().named_field("b", 2).build())
.command(let_command().named_field("c", 3).build())
.build(),
pipeline()
.command(select_command()
.named_field("c", 3)
.named_field("b", 2)
.named_field("a", 1)
.build())
.build(),
Struct::default().with_str("c", INT).with_str("b", INT).with_str("a", INT)
)]
#[case::compound_let_preserved(
pipeline()
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.build())
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.build())
.build(),
pipeline()
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default().with_str("a", INT).with_str("b", INT).into())
)]
#[case::compound_let_after_explode_barrier(
pipeline()
// Setup: create nested struct and temp field
.command(let_command()
.named_field(
CompoundIdentifier::new("data".into(), "arr".into(), vec![]),
array().element(1).element(2).element(3),
)
.named_field("temp", 42)
.build())
// EXPLODE is a barrier
.command(hamelin_lib::tree::builder::explode_command()
.named_field("temp", field_ref("temp"))
.build())
// After barrier: LET with compound identifier into existing struct
.command(let_command()
.named_field(
CompoundIdentifier::new("data".into(), "arr".into(), vec![]),
field_ref("temp"),
)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field(
CompoundIdentifier::new("data".into(), "arr".into(), vec![]),
array().element(1).element(2).element(3),
)
.named_field("temp", 42)
.build())
// EXPLODE barrier preserved
.command(hamelin_lib::tree::builder::explode_command()
.named_field("temp", field_ref("temp"))
.build())
// Second SELECT: compound assignment + temp passthrough, but NO `data` passthrough
.command(select_command()
.named_field(
CompoundIdentifier::new("data".into(), "arr".into(), vec![]),
field_ref("temp"),
)
.named_field("temp", field_ref("temp"))
.build())
.build(),
Struct::default()
.with_str("data", Struct::default().with_str("arr", INT).into())
.with_str("temp", INT)
)]
#[case::compound_let_sibling_preserves_existing(
pipeline()
// Setup: create struct with two fields
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: LET adds a new sibling field to existing struct
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "y".into(), vec![]),
3,
)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: passthroughs in schema order, then new field at end
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "a"),
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "b"),
)
.named_field(
CompoundIdentifier::new("x".into(), "y".into(), vec![]),
3,
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default().with_str("a", INT).with_str("b", INT).with_str("y", INT).into())
)]
#[case::deep_sibling_preservation(
pipeline()
// Setup: create deeply nested struct with two fields
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
4,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["e".into()]),
5,
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: LET overwrites one deep field
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
6,
)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
4,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["e".into()]),
5,
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: overwritten field + passthrough for sibling
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
6,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["e".into()]),
hamelin_lib::tree::builder::field(
hamelin_lib::tree::builder::field(field_ref("x"), "c"),
"e"
),
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default()
.with_str("c", Struct::default()
.with_str("d", INT)
.with_str("e", INT)
.into())
.into())
)]
#[case::parent_then_child_assignment(
pipeline()
// Setup: create struct via struct literal
.command(let_command()
.named_field(
"x",
hamelin_lib::tree::builder::struct_literal()
.field("a", 1)
.field("b", 2),
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: LET assigns a child field
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
3,
)
.build())
.build(),
pipeline()
// First SELECT: struct literal emitted directly
.command(select_command()
.named_field(
"x",
hamelin_lib::tree::builder::struct_literal()
.field("a", 1)
.field("b", 2),
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: child assignment + leaf passthrough for sibling
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
3,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "b"),
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default().with_str("a", INT).with_str("b", INT).into())
)]
#[case::child_then_parent_assignment(
pipeline()
// Setup: create child field
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: LET assigns struct literal - replaces x entirely
.command(let_command()
.named_field(
"x",
hamelin_lib::tree::builder::struct_literal().field("y", 2),
)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: struct literal emitted directly
.command(select_command()
.named_field(
"x",
hamelin_lib::tree::builder::struct_literal().field("y", 2),
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default().with_str("y", INT).into())
)]
#[case::mixed_deep_and_new_field(
pipeline()
// Setup: create struct with nested structure
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
10,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["e".into()]),
20,
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: LET assigns new sibling + overwrites deep field
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "y".into(), vec![]),
3,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
4,
)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
10,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["e".into()]),
20,
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: schema order x.a, x.b, x.c.d, x.c.e, x.y
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "a"),
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "b"),
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["d".into()]),
4,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec!["e".into()]),
hamelin_lib::tree::builder::field(
hamelin_lib::tree::builder::field(field_ref("x"), "c"),
"e"
),
)
.named_field(
CompoundIdentifier::new("x".into(), "y".into(), vec![]),
3,
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default()
.with_str("a", INT)
.with_str("b", INT)
.with_str("c", Struct::default().with_str("d", INT).with_str("e", INT).into())
.with_str("y", INT)
.into())
)]
#[case::drop_and_let_fused(
pipeline()
// Setup: create struct with three fields
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec![]),
3,
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: DROP one field, then LET adds another
.command(drop_command()
.field(CompoundIdentifier::new("x".into(), "b".into(), vec![]))
.build())
.command(let_command()
.named_field(
CompoundIdentifier::new("x".into(), "d".into(), vec![]),
4,
)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
1,
)
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
2,
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec![]),
3,
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: schema order x.a, x.c, x.d
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "a".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "a"),
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "c"),
)
.named_field(
CompoundIdentifier::new("x".into(), "d".into(), vec![]),
4,
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default()
.with_str("a", INT)
.with_str("c", INT)
.with_str("d", INT)
.into())
)]
#[case::drop_child_from_struct_binding(
pipeline()
// Setup: create struct via struct literal
.command(let_command()
.named_field(
"x",
struct_literal()
.field("a", 1)
.field("b", 2)
.field("c", 3),
)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: DROP a child field
.command(drop_command()
.field(CompoundIdentifier::new("x".into(), "a".into(), vec![]))
.build())
.build(),
pipeline()
// First SELECT: struct literal emitted directly
.command(select_command()
.named_field(
"x",
struct_literal()
.field("a", 1)
.field("b", 2)
.field("c", 3),
)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: passthroughs for remaining children (x.a dropped)
.command(select_command()
.named_field(
CompoundIdentifier::new("x".into(), "b".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "b"),
)
.named_field(
CompoundIdentifier::new("x".into(), "c".into(), vec![]),
hamelin_lib::tree::builder::field(field_ref("x"), "c"),
)
.build())
.build(),
Struct::default()
.with_str("x", Struct::default().with_str("b", INT).with_str("c", INT).into())
)]
#[case::no_overlap_unrelated_fields(
pipeline()
// Setup: create one field
.command(let_command()
.named_field("a", 1)
.build())
// WHERE is a barrier
.command(hamelin_lib::tree::builder::where_command(true).build())
// After barrier: LET adds unrelated field
.command(let_command()
.named_field("b", 2)
.build())
.build(),
pipeline()
// First SELECT (before barrier)
.command(select_command()
.named_field("a", 1)
.build())
// WHERE barrier preserved
.command(hamelin_lib::tree::builder::where_command(true).build())
// Second SELECT: new field + passthrough for existing
.command(select_command()
.named_field("b", 2)
.named_field("a", field_ref("a"))
.build())
.build(),
Struct::default().with_str("b", INT).with_str("a", INT)
)]
fn test_fuse_projections(
#[case] input: Pipeline,
#[case] expected: Pipeline,
#[case] expected_output_schema: Struct,
) {
let input_typed = type_check(input).output;
let expected_typed = type_check(expected).output;
let mut ctx = StatementTranslationContext::default();
let result = fuse_projections(Arc::new(input_typed), &mut ctx).unwrap();
assert_eq!(result.ast, expected_typed.ast);
let result_schema = result.environment().as_struct().clone();
assert_eq!(result_schema, expected_output_schema);
}
}