hamelin_translation 0.9.2

Lowering and IR for Hamelin query language
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
//! Pipeline pass: WINDOW compound identifier normalization.
//!
//! Lowers compound identifiers in WINDOW commands to flat identifiers + SET/DROP restoration.
//! Partition-by expressions with compound identifiers are projected into flat columns
//! *before* the WINDOW, so the WINDOW's group_by only contains pure column references.
//!
//! Example:
//! ```text
//! FROM events | WINDOW stats.running = sum(value) BY info.group
//! ```
//! becomes:
//! ```text
//! FROM events | SET __normalize_window_1 = info.group
//!             | WINDOW __normalize_window_0 = sum(value) BY __normalize_window_1
//!             | SET stats.running = __normalize_window_0
//!             | DROP __normalize_window_0
//!             | SET info.group = __normalize_window_1
//!             | DROP __normalize_window_1
//! ```

use std::sync::Arc;

use hamelin_lib::err::TranslationError;
use hamelin_lib::tree::{
    ast::{
        command::Command,
        expression::{ExpressionKind, FieldReference},
        identifier::{Identifier, SimpleIdentifier},
    },
    builder::{self, drop_command, field_ref, set_command, window_command},
    typed_ast::{
        clause::Projections,
        command::{TypedCommand, TypedCommandKind, TypedWindowCommand},
        context::StatementTranslationContext,
        pipeline::TypedPipeline,
    },
};

use super::super::compound_lowering::{lower_compound_assignments, UniqueNameGenerator};

/// Normalize WINDOW commands with compound identifiers in a pipeline.
///
/// Contract: `Arc<TypedPipeline> -> Result<Arc<TypedPipeline>, ...>`
pub fn normalize_window(
    pipeline: Arc<TypedPipeline>,
    ctx: &mut StatementTranslationContext,
) -> Result<Arc<TypedPipeline>, Arc<TranslationError>> {
    // Check if any WINDOW command has compound identifiers
    if !pipeline
        .valid_ref()?
        .commands
        .iter()
        .any(window_needs_normalization)
    {
        return Ok(pipeline);
    }

    let valid = pipeline.valid_ref()?;

    // Shared name generator for all WINDOW commands in this pipeline
    let mut name_gen = UniqueNameGenerator::new("__normalize_window");

    // Transform commands
    let mut pipe_builder = builder::pipeline();
    for cmd in &valid.commands {
        for c in normalize_command(cmd, &mut name_gen)? {
            pipe_builder = pipe_builder.command(c);
        }
    }

    let new_ast = pipe_builder.build().at(pipeline.ast.span);

    // Re-typecheck
    Ok(Arc::new(TypedPipeline::from_ast_with_context(
        Arc::new(new_ast),
        ctx,
    )))
}

/// Check if a WINDOW command needs normalization.
///
/// Returns true if any projection or group_by assignment has a compound identifier,
/// or if any group_by assignment has a non-identity expression (e.g. `BY grp = a`
/// where the expression differs from the identifier name).
fn window_needs_normalization(cmd: &Arc<TypedCommand>) -> bool {
    let TypedCommandKind::Window(window_cmd) = &cmd.kind else {
        return false;
    };
    has_compound_identifiers(&window_cmd.projections)
        || has_compound_identifiers(&window_cmd.group_by)
        || has_non_identity_group_by(&window_cmd.group_by)
}

/// Check if any assignment in projections has a compound identifier.
fn has_compound_identifiers(projections: &Projections) -> bool {
    projections.assignments.iter().any(|a| {
        a.identifier
            .valid_ref()
            .map(|id| matches!(id, Identifier::Compound(_)))
            .unwrap_or(false)
    })
}

/// Check if any group_by assignment has a non-identity expression.
fn has_non_identity_group_by(projections: &Projections) -> bool {
    projections.assignments.iter().any(|a| {
        let Ok(Identifier::Simple(simple)) = a.identifier.valid_ref() else {
            return false;
        };
        !matches!(
            &a.expression.ast.kind,
            ExpressionKind::FieldReference(FieldReference { field_name })
                if field_name.valid_ref().map(|n| n == simple).unwrap_or(false)
        )
    })
}

/// Normalize a single command - transforms WINDOW with compounds, passes others through.
fn normalize_command(
    cmd: &Arc<TypedCommand>,
    name_gen: &mut UniqueNameGenerator,
) -> Result<Vec<Arc<Command>>, Arc<TranslationError>> {
    let TypedCommandKind::Window(window_cmd) = &cmd.kind else {
        return Ok(vec![cmd.ast.clone()]);
    };

    if !window_needs_normalization(cmd) {
        return Ok(vec![cmd.ast.clone()]);
    }

    transform_window(window_cmd, cmd, name_gen)
}

/// Transform a WINDOW command, lowering compound identifiers.
///
/// Projection compounds are lowered as before (temp name inside WINDOW, SET/DROP after).
///
/// Partition-by compounds are handled differently: they are projected into flat columns
/// *before* the WINDOW command via SET, so the WINDOW's group_by only contains pure
/// column references. This ensures downstream translators (SQL, DataFusion) never see
/// partition_by identifiers in the WINDOW's output_schema that don't exist as input columns.
fn transform_window(
    window_cmd: &TypedWindowCommand,
    cmd: &TypedCommand,
    name_gen: &mut UniqueNameGenerator,
) -> Result<Vec<Arc<Command>>, Arc<TranslationError>> {
    // Lower compound assignments in projections (placed inside WINDOW, restored after)
    let (window_assignments, window_restores) =
        lower_compound_assignments(&window_cmd.projections, name_gen, &cmd.input_schema);

    // Lower compound partition_by: project before WINDOW, reference as column ref in WINDOW
    let mut set_befores: Vec<Command> = Vec::new();
    let mut group_by_names: Vec<SimpleIdentifier> = Vec::new();
    let mut group_by_restores: Vec<Command> = Vec::new();

    for assignment in &window_cmd.group_by.assignments {
        match assignment.identifier.valid_ref()? {
            Identifier::Compound(compound) => {
                // Compound: SET-before creates flat column, WINDOW references it, SET-after restores
                let flat_name = name_gen.next(&cmd.input_schema);

                set_befores.push(
                    set_command()
                        .named_field(
                            flat_name.clone(),
                            assignment.expression.ast.as_ref().clone(),
                        )
                        .build(),
                );
                group_by_names.push(flat_name.clone());
                group_by_restores.push(
                    set_command()
                        .named_field(
                            Into::<Identifier>::into(compound.clone()),
                            field_ref(flat_name.as_str()),
                        )
                        .build(),
                );
                group_by_restores.push(drop_command().field(flat_name).build());
            }
            Identifier::Simple(simple) => {
                let is_identity = matches!(
                    &assignment.expression.ast.kind,
                    ExpressionKind::FieldReference(FieldReference { field_name })
                        if field_name.valid_ref().map(|n| n == simple).unwrap_or(false)
                );

                if is_identity {
                    group_by_names.push(simple.clone());
                } else {
                    set_befores.push(
                        set_command()
                            .named_field(simple.clone(), assignment.expression.ast.as_ref().clone())
                            .build(),
                    );
                    group_by_names.push(simple.clone());
                }
            }
        }
    }

    // Build the WINDOW command
    let mut builder = window_command().at(cmd.ast.span);
    for (id, expr) in window_assignments {
        builder = builder.named_field(id, expr);
    }
    for name in &group_by_names {
        builder = builder.group_by(name.clone(), field_ref(name.as_str()));
    }
    for sort_expr in &window_cmd.sort_by {
        builder = builder.sort_expr(sort_expr.ast.as_ref().clone());
    }
    if let Some(within) = &window_cmd.within {
        builder = builder.within(within.ast.clone());
    }

    // Combine: SET-befores + WINDOW + projection restores + partition_by restores
    let mut result: Vec<Arc<Command>> = set_befores.into_iter().map(Arc::new).collect();
    result.push(Arc::new(builder.build()));
    result.extend(window_restores.into_iter().map(Arc::new));
    result.extend(group_by_restores.into_iter().map(Arc::new));
    Ok(result)
}

#[cfg(test)]
mod tests {
    use super::*;
    use hamelin_lib::type_check;
    use hamelin_lib::{
        tree::ast::expression::IntervalUnit,
        tree::{
            ast::{identifier::CompoundIdentifier, pipeline::Pipeline},
            builder::{
                call, drop_command, field_ref, pipeline, select_command, set_command, sort_command,
                window_command, IntervalLiteralBuilder,
            },
        },
        types::{struct_type::Struct, INT},
    };
    use pretty_assertions::assert_eq;
    use rstest::rstest;
    use std::sync::Arc;

    #[rstest]
    // Case 1: No WINDOW commands - passes through unchanged
    #[case::no_window_passthrough(
        pipeline()
            .command(select_command().named_field("a", 1).named_field("b", 2).build())
            .build(),
        pipeline()
            .command(select_command().named_field("a", 1).named_field("b", 2).build())
            .build(),
        Struct::default().with_str("a", INT).with_str("b", INT)
    )]
    // Case 2: WINDOW with simple identifiers - passes through unchanged
    #[case::window_simple_ids_unchanged(
        pipeline()
            .command(select_command().named_field("value", 10).named_field("category", 1).build())
            .command(window_command()
                .named_field("running", call("sum").arg(field_ref("value")))
                .group_by("category", field_ref("category"))
                .build())
            .build(),
        pipeline()
            .command(select_command().named_field("value", 10).named_field("category", 1).build())
            .command(window_command()
                .named_field("running", call("sum").arg(field_ref("value")))
                .group_by("category", field_ref("category"))
                .build())
            .build(),
        // Schema order: projections first, then partition_by, then parent fields
        // WINDOW binds partition_by fields directly, so category comes before value
        Struct::default()
            .with_str("running", INT)
            .with_str("category", INT)
            .with_str("value", INT)
    )]
    // Case 3: WINDOW with compound identifier in projection → WINDOW + SET + DROP
    #[case::window_compound_projection(
        pipeline()
            .command(select_command().named_field("value", 10).named_field("category", 1).build())
            .command(window_command()
                .named_field(
                    CompoundIdentifier::new("stats".into(), "running".into(), vec![]),
                    call("sum").arg(field_ref("value"))
                )
                .group_by("category", field_ref("category"))
                .build())
            .build(),
        pipeline()
            .command(select_command().named_field("value", 10).named_field("category", 1).build())
            .command(window_command()
                .named_field("__normalize_window_0", call("sum").arg(field_ref("value")))
                .group_by("category", field_ref("category"))
                .build())
            .command(set_command()
                .named_field(
                    CompoundIdentifier::new("stats".into(), "running".into(), vec![]),
                    field_ref("__normalize_window_0")
                )
                .build())
            .command(drop_command().field("__normalize_window_0").build())
            .build(),
        // Schema order after SET and DROP:
        // SET prepends stats, WINDOW had {__normalize_window_0, category, value}, DROP removes temp
        // Result: {stats.running, category, value}
        Struct::default()
            .with_str("stats", Struct::default().with_str("running", INT).into())
            .with_str("category", INT)
            .with_str("value", INT)
    )]
    // Case 4: WINDOW with compound identifier in group_by
    // The compound group_by is projected before the WINDOW via SET, then restored after.
    #[case::window_compound_group_by(
        pipeline()
            .command(select_command().named_field("value", 10).named_field("cat", 1).build())
            .command(window_command()
                .named_field("running", call("sum").arg(field_ref("value")))
                .group_by(
                    CompoundIdentifier::new("group".into(), "key".into(), vec![]),
                    field_ref("cat")
                )
                .build())
            .build(),
        pipeline()
            .command(select_command().named_field("value", 10).named_field("cat", 1).build())
            // SET-before: project compound partition_by into flat column
            .command(set_command()
                .named_field("__normalize_window_0", field_ref("cat"))
                .build())
            // WINDOW with simple column ref in group_by
            .command(window_command()
                .named_field("running", call("sum").arg(field_ref("value")))
                .group_by("__normalize_window_0", field_ref("__normalize_window_0"))
                .build())
            // SET-after: restore compound path
            .command(set_command()
                .named_field(
                    CompoundIdentifier::new("group".into(), "key".into(), vec![]),
                    field_ref("__normalize_window_0")
                )
                .build())
            .command(drop_command().field("__normalize_window_0").build())
            .build(),
        // Schema: SET-before adds __normalize_window_0, WINDOW adds running,
        // SET-after prepends group.key, DROP removes temp
        Struct::default()
            .with_str(
                "group",
                Struct::default()
                    .with_str("key", INT)
                    .into(),
            )
            .with_str("running", INT)
            .with_str("value", INT)
            .with_str("cat", INT)
    )]
    // Case 5: WINDOW with compound identifiers in both projection and group_by
    // Projection compound → temp inside WINDOW + SET/DROP after
    // Group_by compound → SET before WINDOW + simple ref in WINDOW + SET/DROP after
    #[case::window_compound_both(
        pipeline()
            .command(select_command().named_field("value", 10).named_field("cat", 1).build())
            .command(window_command()
                .named_field(
                    CompoundIdentifier::new("stats".into(), "running".into(), vec![]),
                    call("sum").arg(field_ref("value"))
                )
                .group_by(
                    CompoundIdentifier::new("group".into(), "key".into(), vec![]),
                    field_ref("cat")
                )
                .build())
            .build(),
        pipeline()
            .command(select_command().named_field("value", 10).named_field("cat", 1).build())
            // SET-before: project compound group_by into flat column
            .command(set_command()
                .named_field("__normalize_window_1", field_ref("cat"))
                .build())
            // WINDOW: projection temp + simple column ref group_by
            .command(window_command()
                .named_field("__normalize_window_0", call("sum").arg(field_ref("value")))
                .group_by("__normalize_window_1", field_ref("__normalize_window_1"))
                .build())
            // Projection restore
            .command(set_command()
                .named_field(
                    CompoundIdentifier::new("stats".into(), "running".into(), vec![]),
                    field_ref("__normalize_window_0")
                )
                .build())
            .command(drop_command().field("__normalize_window_0").build())
            // Group_by restore
            .command(set_command()
                .named_field(
                    CompoundIdentifier::new("group".into(), "key".into(), vec![]),
                    field_ref("__normalize_window_1")
                )
                .build())
            .command(drop_command().field("__normalize_window_1").build())
            .build(),
        Struct::default()
            .with_str(
                "group",
                Struct::default()
                    .with_str("key", INT)
                    .into(),
            )
            .with_str("stats", Struct::default().with_str("running", INT).into())
            .with_str("value", INT)
            .with_str("cat", INT)
    )]
    // Case 6: WINDOW with compound projection and WITHIN preserved
    #[case::window_compound_within(
        pipeline()
            .command(select_command()
                .named_field("value", 10)
                .named_field("category", 1)
                .named_field("timestamp", 5)
                .build())
            .command(window_command()
                .named_field(
                    CompoundIdentifier::new("stats".into(), "running".into(), vec![]),
                    call("sum").arg(field_ref("value"))
                )
                .group_by("category", field_ref("category"))
                .sort(sort_command().by(field_ref("timestamp")))
                .within(IntervalLiteralBuilder::new(-5, IntervalUnit::Hour))
                .build())
            .build(),
        pipeline()
            .command(select_command()
                .named_field("value", 10)
                .named_field("category", 1)
                .named_field("timestamp", 5)
                .build())
            .command(window_command()
                .named_field("__normalize_window_0", call("sum").arg(field_ref("value")))
                .group_by("category", field_ref("category"))
                .sort(sort_command().by(field_ref("timestamp")))
                .within(IntervalLiteralBuilder::new(-5, IntervalUnit::Hour))
                .build())
            .command(set_command()
                .named_field(
                    CompoundIdentifier::new("stats".into(), "running".into(), vec![]),
                    field_ref("__normalize_window_0")
                )
                .build())
            .command(drop_command().field("__normalize_window_0").build())
            .build(),
        // Schema order: stats prepended by SET, then WINDOW's {category}, then parent's {value, timestamp}
        Struct::default()
            .with_str("stats", Struct::default().with_str("running", INT).into())
            .with_str("category", INT)
            .with_str("value", INT)
            .with_str("timestamp", INT)
    )]
    fn test_normalize_window(
        #[case] input: Pipeline,
        #[case] expected: Pipeline,
        #[case] expected_output_schema: Struct,
    ) {
        let input_typed = type_check(input).output;
        let expected_typed = type_check(expected).output;

        let mut ctx = StatementTranslationContext::default();
        let result = normalize_window(Arc::new(input_typed), &mut ctx).unwrap();

        // Compare ASTs
        assert_eq!(result.ast, expected_typed.ast);

        // Verify output schema
        let result_schema = result.environment().as_struct().clone();
        assert_eq!(result_schema, expected_output_schema);
    }
}