hamelin_sql 0.7.1

SQL generation utilities for Hamelin query language
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
//! Command translation from IR to SQL AST.
//!
//! This module translates `IRCommand` variants to SQL query modifications.
//! Each command type (FROM, WHERE, SELECT, etc.) has its own translation logic
//! that builds up the SQL query incrementally.
//!
//! # Dialect Extensibility
//!
//! The `CommandTranslator` trait allows dialects to override specific command
//! translations while inheriting defaults for others. For example, EXPLODE
//! uses different SQL patterns across dialects:
//! - Trino: `CROSS JOIN UNNEST(array_col) AS t(col_name)`
//! - DuckDB: `unnest(array_col)` as a table function
//!
//! Dialects implement `CommandTranslator` and override only the methods they need.

use hamelin_lib::err::{TranslationError, TranslationErrors};
use hamelin_lib::func::def::{FunctionTranslationContext, SpecialPosition};
use hamelin_lib::sql::expression::identifier::{
    Identifier as SQLIdentifier, SimpleIdentifier as SQLSimpleIdentifier,
};
use hamelin_lib::sql::expression::literal::{ColumnReference, IntegerLiteral};
use hamelin_lib::sql::expression::{Direction, OrderByExpression};
use hamelin_lib::sql::query::projection::{Binding, ColumnProjection, Projection};
use hamelin_lib::sql::query::set::SetOperation;
use hamelin_lib::sql::query::window::{
    FrameBoundary, FrameType, WindowFrame as SQLWindowFrame, WindowReference, WindowSpecification,
};
use hamelin_lib::sql::query::{
    Join, JoinClause, JoinType as SQLJoinType, SQLQuery, SQLQueryExpression, SubQuery,
    TableExpression, TableReference,
};
use hamelin_lib::tree::ast::clause::SortOrder;
use hamelin_lib::tree::ast::identifier::SimpleIdentifier;

use hamelin_translation::{
    IRAggCommand, IRCommand, IRCommandKind, IRExplodeCommand, IRFromCommand, IRInput,
    IRJoinCommand, IRLimitCommand, IRSelectCommand, IRSortCommand, IRSortExpression,
    IRWhereCommand, IRWindowCommand, JoinType as IRJoinType,
};
use hamelin_translation::{RangeBound, RowBound, WindowFrame};

use crate::context::TranslationContext;
use crate::expr::translate_expression;

use hamelin_lib::tree::typed_ast::environment::TypeEnvironment;

/// Result type for command translation.
pub type CommandTranslationResult = Result<SQLQuery, TranslationErrors>;

/// The input query state for command translation.
/// `None` at the start of a pipeline, `Some(query)` after processing commands.
pub type QueryInput = Option<SQLQuery>;

/// Build explicit projections for all columns in the output schema.
///
/// Creates `col AS col` bindings for each field in the schema.
fn build_projections_from_schema(schema: &TypeEnvironment) -> Vec<Projection> {
    schema
        .as_struct()
        .iter()
        .map(|(name, _typ)| {
            let sql_name: SQLSimpleIdentifier = name.clone().into();
            let col_ref = ColumnReference::new(sql_name.clone().into());
            Binding::new(sql_name, col_ref.into()).into()
        })
        .collect()
}

/// Trait for translating IR commands to SQL.
///
/// This trait provides default implementations for all command types.
/// Dialects can override specific methods to customize SQL generation.
pub trait CommandTranslator {
    /// Main dispatcher - translates any IR command to SQL.
    ///
    /// Override this only if you need to change the dispatch logic itself.
    /// Most dialects should override individual command methods instead.
    fn translate_command(
        &self,
        ctx: &mut TranslationContext,
        cmd: &IRCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        match &cmd.kind {
            IRCommandKind::From(from_cmd) => self.translate_from(ctx, cmd, from_cmd, query),
            IRCommandKind::Where(where_cmd) => self.translate_where(ctx, cmd, where_cmd, query),
            IRCommandKind::Select(select_cmd) => self.translate_select(ctx, cmd, select_cmd, query),
            IRCommandKind::Sort(sort_cmd) => self.translate_sort(ctx, cmd, sort_cmd, query),
            IRCommandKind::Limit(limit_cmd) => self.translate_limit(ctx, cmd, limit_cmd, query),
            IRCommandKind::Agg(agg_cmd) => self.translate_agg(ctx, cmd, agg_cmd, query),
            IRCommandKind::Window(window_cmd) => self.translate_window(ctx, cmd, window_cmd, query),
            IRCommandKind::Join(join_cmd) => self.translate_join(ctx, cmd, join_cmd, query),
            IRCommandKind::Explode(explode_cmd) => {
                self.translate_explode(ctx, cmd, explode_cmd, query)
            }
        }
    }

    /// Translate a FROM command.
    ///
    /// FROM can have one or more inputs:
    /// - Single input: direct table reference or CTE reference
    /// - Multiple inputs: UNION ALL of all inputs
    fn translate_from(
        &self,
        _ctx: &mut TranslationContext,
        cmd: &IRCommand,
        from_cmd: &IRFromCommand,
        _query: QueryInput,
    ) -> CommandTranslationResult {
        if from_cmd.inputs.is_empty() {
            return Err(TranslationError::msg(cmd, "FROM command has no inputs").single());
        }

        let projections = build_projections_from_schema(&cmd.output_schema);

        if from_cmd.inputs.len() == 1 {
            // Single input - just create a query with FROM
            let table_expr = translate_input(&from_cmd.inputs[0]);
            Ok(SQLQuery::default().from(table_expr).select(projections))
        } else {
            // Multiple inputs - UNION ALL
            // Each input becomes a SELECT <cols> FROM input query
            let mut queries: Vec<SQLQueryExpression> = from_cmd
                .inputs
                .iter()
                .map(|input| {
                    let table_expr = translate_input(input);
                    let projs = build_projections_from_schema(&cmd.output_schema);
                    let query = SQLQuery::default().from(table_expr).select(projs);
                    query.into()
                })
                .collect();

            // Build the UNION ALL chain left-to-right
            let first = queries.remove(0);
            let result = queries
                .into_iter()
                .fold(first, |acc, next| SetOperation::new(acc, next).into());

            // Wrap in a subquery to allow further operations
            match result {
                SQLQueryExpression::SetOperation(set_op) => {
                    let subquery = SubQuery::new(set_op.into());
                    Ok(SQLQuery::default()
                        .from(subquery.into())
                        .select(projections))
                }
                SQLQueryExpression::SQLQuery(q) => Ok(q),
            }
        }
    }

    /// Translate a WHERE command.
    ///
    /// Adds a filter predicate to the query.
    ///
    /// If there's an input query, we wrap it in a subquery first. This ensures
    /// that the WHERE clause can reference column aliases from the previous
    /// SELECT (SQL doesn't allow referencing aliases in the same SELECT's WHERE).
    fn translate_where(
        &self,
        _ctx: &mut TranslationContext,
        cmd: &IRCommand,
        where_cmd: &IRWhereCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        let predicate_sql = translate_expression(_ctx, &where_cmd.predicate)?;
        match query {
            Some(q) => {
                // Wrap in subquery to allow referencing column aliases
                let subquery = SubQuery::new(q.into());

                // Build explicit projections for all output columns
                let projections: Vec<Projection> = cmd
                    .output_schema
                    .as_struct()
                    .keys()
                    .map(|name| {
                        let sql_name: SQLSimpleIdentifier = name.clone().into();
                        ColumnProjection::new(sql_name.into()).into()
                    })
                    .collect();

                Ok(SQLQuery::default()
                    .from(subquery.into())
                    .select(projections)
                    .where_(predicate_sql))
            }
            None => Ok(SQLQuery::default().where_(predicate_sql)),
        }
    }

    /// Translate a SELECT command.
    ///
    /// If no input query, just apply projections directly (expression-only queries).
    /// Otherwise wraps the input in a subquery and applies projections.
    fn translate_select(
        &self,
        ctx: &mut TranslationContext,
        _cmd: &IRCommand,
        select_cmd: &IRSelectCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        // Translate all assignments to projections
        let mut projections = Vec::with_capacity(select_cmd.assignments.len());
        for assignment in &select_cmd.assignments {
            let expr_sql = translate_expression(ctx, &assignment.expression)?;
            let name: SQLSimpleIdentifier = assignment.identifier.clone().into();
            projections.push(Binding::new(name, expr_sql).into());
        }

        match query {
            None => Ok(SQLQuery::default().select(projections)),
            Some(q) => {
                // Extract ORDER BY before wrapping - it must be on the outer query
                // to guarantee result ordering (ORDER BY inside subqueries is not
                // guaranteed to affect outer query ordering in SQL)
                let order_by = q.order_by.clone();
                let mut inner = q;
                inner.order_by = None;

                let subquery = SubQuery::new(inner.into());
                let mut result = SQLQuery::default()
                    .from(subquery.into())
                    .select(projections);

                if let Some(order_by) = order_by {
                    result = result.order_by(order_by);
                }

                Ok(result)
            }
        }
    }

    /// Translate a SORT command.
    ///
    /// Adds ORDER BY clause to the query.
    fn translate_sort(
        &self,
        ctx: &mut TranslationContext,
        _cmd: &IRCommand,
        sort_cmd: &IRSortCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        let order_bys = translate_sort_expressions(ctx, &sort_cmd.sort_by)?;
        match query {
            Some(q) => Ok(q.order_by(order_bys)),
            None => Ok(SQLQuery::default().order_by(order_bys)),
        }
    }

    /// Translate a LIMIT command.
    ///
    /// Adds LIMIT clause to the query. If the query already has a LIMIT,
    /// wraps it in a subquery first to preserve both limits.
    fn translate_limit(
        &self,
        _ctx: &mut TranslationContext,
        cmd: &IRCommand,
        limit_cmd: &IRLimitCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        let count_sql = IntegerLiteral::from_int(limit_cmd.count as i64).into();
        match query {
            Some(q) if q.limit.is_some() => {
                // Query already has a limit - wrap in subquery to preserve it
                let subquery = SubQuery::new(q.into());
                let projections = build_projections_from_schema(&cmd.output_schema);
                Ok(SQLQuery::default()
                    .from(subquery.into())
                    .select(projections)
                    .limit(count_sql))
            }
            Some(q) => Ok(q.limit(count_sql)),
            None => Ok(SQLQuery::default().limit(count_sql)),
        }
    }

    /// Translate an AGG command.
    ///
    /// Creates a GROUP BY query with aggregate projections.
    fn translate_agg(
        &self,
        ctx: &mut TranslationContext,
        _cmd: &IRCommand,
        agg_cmd: &IRAggCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        // Build the function translation context for aggregates
        let order_by = translate_sort_expressions(ctx, &agg_cmd.sort_by)?;
        let saved_fctx = std::mem::replace(
            &mut ctx.fctx,
            FunctionTranslationContext::default()
                .with_special_allowed(SpecialPosition::Agg)
                .with_order_by(order_by),
        );

        // Translate group_by expressions (these become both GROUP BY and projections)
        let mut group_by_exprs = Vec::with_capacity(agg_cmd.group_by.len());
        let mut projections = Vec::with_capacity(agg_cmd.group_by.len() + agg_cmd.aggregates.len());

        for assignment in &agg_cmd.group_by {
            let expr_sql = translate_expression(ctx, &assignment.expression)?;
            group_by_exprs.push(expr_sql.clone());
            let name: SQLSimpleIdentifier = assignment.identifier.clone().into();
            projections.push(Binding::new(name, expr_sql).into());
        }

        // Translate aggregate expressions
        for assignment in &agg_cmd.aggregates {
            let expr_sql = translate_expression(ctx, &assignment.expression)?;
            let name: SQLSimpleIdentifier = assignment.identifier.clone().into();
            projections.push(Binding::new(name, expr_sql).into());
        }

        // Restore the original fctx
        ctx.fctx = saved_fctx;

        // Build the final query
        let mut result = SQLQuery::default().select(projections);
        if !group_by_exprs.is_empty() {
            result = result.group_by(group_by_exprs);
        }
        match query {
            Some(q) => Ok(result.from(SubQuery::new(q.into()).into())),
            None => Ok(result),
        }
    }

    /// Translate a WINDOW command.
    ///
    /// Creates window function projections with OVER clause.
    ///
    /// WINDOW extends the input with new columns (unlike AGG which replaces).
    /// So we need to pass through all input columns AND add the window projections.
    fn translate_window(
        &self,
        ctx: &mut TranslationContext,
        cmd: &IRCommand,
        window_cmd: &IRWindowCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        // Build the window specification
        let mut partition_by = Vec::with_capacity(window_cmd.partition_by.len());
        for expr in &window_cmd.partition_by {
            partition_by.push(translate_expression(ctx, expr)?);
        }

        let order_by = translate_sort_expressions(ctx, &window_cmd.sort_by)?;

        let frame = window_cmd
            .frame
            .as_ref()
            .map(|f| translate_window_frame(ctx, f))
            .transpose()?;

        let window_spec = WindowSpecification {
            partition_by,
            order_by: order_by.clone(),
            frame,
        };

        // Build the function translation context for window functions
        // Allow both Window and Agg positions since aggregate functions can be used
        // as window functions with an OVER clause (e.g., sum(x) OVER (...))
        let saved_fctx = std::mem::replace(
            &mut ctx.fctx,
            FunctionTranslationContext::default()
                .with_special_allowed(SpecialPosition::Window)
                .with_special_allowed(SpecialPosition::Agg)
                .with_window(WindowReference::WindowSpecification(window_spec)),
        );

        // Translate window projections
        let mut projections = Vec::with_capacity(window_cmd.projections.len());
        for assignment in &window_cmd.projections {
            let expr_sql = translate_expression(ctx, &assignment.expression)?;
            let name: SQLSimpleIdentifier = assignment.identifier.clone().into();
            projections.push(Binding::new(name, expr_sql).into());
        }

        // Restore the original fctx
        ctx.fctx = saved_fctx;

        // WINDOW passes through all input columns in addition to the window projections.
        // Partition-by columns are pre-projected by the normalization pass, so they
        // exist as regular input columns and pass through naturally.
        let window_projection_names: std::collections::HashSet<_> = window_cmd
            .projections
            .iter()
            .map(|a| a.identifier.clone())
            .collect();

        for (col_name, _col_type) in cmd.output_schema.as_struct().iter() {
            let simple_id: SimpleIdentifier = col_name.clone().into();
            if !window_projection_names.contains(&simple_id) {
                // Pass through this column from the input
                let sql_name: SQLSimpleIdentifier = simple_id.into();
                let col_ref = ColumnReference::new(sql_name.clone().into());
                projections.push(Binding::new(sql_name, col_ref.into()).into());
            }
        }

        // Build the final query
        let result = SQLQuery::default().select(projections);
        match query {
            Some(q) => Ok(result.from(SubQuery::new(q.into()).into())),
            None => Ok(result),
        }
    }

    /// Translate a JOIN command.
    ///
    /// Creates a SQL JOIN with the specified join type and condition.
    fn translate_join(
        &self,
        ctx: &mut TranslationContext,
        cmd: &IRCommand,
        join_cmd: &IRJoinCommand,
        query: QueryInput,
    ) -> CommandTranslationResult {
        // JOIN requires a left side query
        let query = query
            .ok_or_else(|| TranslationError::msg(cmd, "JOIN requires a left side").single())?;
        let left_alias = SQLSimpleIdentifier::new("_left");

        // Wrap the left side in a subquery with alias
        let left_subquery = SubQuery::new(query.into()).alias(left_alias.clone().into());

        // Right side is always a CTE reference (simple identifier)
        let right_alias: SQLSimpleIdentifier = join_cmd.right.clone().into();
        let right_table =
            TableReference::new(right_alias.clone().into()).alias(right_alias.clone().into());

        // Translate the join condition
        let condition_sql = translate_expression(ctx, &join_cmd.condition)?;

        // Map IR join type to SQL join type
        let sql_join_type = match join_cmd.join_type {
            IRJoinType::Inner => SQLJoinType::INNER,
            IRJoinType::Left => SQLJoinType::LEFT,
        };

        // Build the join
        let join = Join::new(left_subquery.into()).with_clause(JoinClause {
            table: right_table.into(),
            join_type: sql_join_type,
            condition: Some(condition_sql),
        });

        // Build explicit projections from output schema
        let projections = build_projections_from_schema(&cmd.output_schema);
        Ok(SQLQuery::default().from(join.into()).select(projections))
    }

    /// Translate an EXPLODE command.
    ///
    /// **No default implementation** - EXPLODE syntax varies significantly across dialects:
    /// - Trino: `CROSS JOIN UNNEST(array_col) AS t(col_name)`
    /// - DuckDB: `unnest(array_col)` as a table function
    ///
    /// Each dialect must provide its own implementation.
    fn translate_explode(
        &self,
        _ctx: &mut TranslationContext,
        cmd: &IRCommand,
        _explode_cmd: &IRExplodeCommand,
        _query: QueryInput,
    ) -> CommandTranslationResult;
}

// =============================================================================
// Helper functions (used by trait default implementations)
// =============================================================================

/// Translate an IR input to a table expression.
fn translate_input(input: &IRInput) -> TableExpression {
    match input {
        IRInput::Table(id) => {
            let sql_id: SQLIdentifier = id.clone().into();
            TableReference::new(sql_id).into()
        }
        IRInput::With(name, _pipeline) => {
            // CTE reference - just reference by name
            // The pipeline is carried for backends that inline CTEs, but we use proper CTEs
            let sql_id: SQLIdentifier = name.clone().into();
            TableReference::new(sql_id).into()
        }
    }
}

/// Translate sort expressions to SQL ORDER BY expressions.
fn translate_sort_expressions(
    ctx: &TranslationContext,
    sort_exprs: &[IRSortExpression],
) -> Result<Vec<OrderByExpression>, TranslationErrors> {
    let mut result = Vec::with_capacity(sort_exprs.len());
    for sort_expr in sort_exprs {
        let expr_sql = translate_expression(ctx, &sort_expr.expression)?;
        let direction = match sort_expr.order {
            SortOrder::Asc => Direction::ASC,
            SortOrder::Desc => Direction::DESC,
        };
        result.push(OrderByExpression::new(expr_sql, direction));
    }
    Ok(result)
}

/// Translate an IR WindowFrame to a SQL WindowFrame.
fn translate_window_frame(
    ctx: &TranslationContext,
    frame: &WindowFrame,
) -> Result<SQLWindowFrame, TranslationErrors> {
    match frame {
        WindowFrame::Rows { start, end } => Ok(SQLWindowFrame {
            frame_type: FrameType::ROWS,
            preceding: translate_row_bound(start),
            following: translate_row_bound(end),
        }),
        WindowFrame::Range { start, end } => Ok(SQLWindowFrame {
            frame_type: FrameType::RANGE,
            preceding: translate_range_bound(ctx, start)?,
            following: translate_range_bound(ctx, end)?,
        }),
    }
}

/// Translate a RowBound to a FrameBoundary.
fn translate_row_bound(bound: &RowBound) -> FrameBoundary {
    match bound {
        RowBound::Unbounded => FrameBoundary::Unbounded,
        RowBound::CurrentRow => FrameBoundary::CurrentRowBoundary,
        RowBound::Preceding(n) | RowBound::Following(n) => {
            FrameBoundary::BoundaryExpression(Box::new(IntegerLiteral::from_int(*n as i64).into()))
        }
    }
}

/// Translate a RangeBound to a FrameBoundary via the expression translator.
fn translate_range_bound(
    ctx: &TranslationContext,
    bound: &RangeBound,
) -> Result<FrameBoundary, TranslationErrors> {
    match bound {
        RangeBound::Unbounded => Ok(FrameBoundary::Unbounded),
        RangeBound::CurrentRow => Ok(FrameBoundary::CurrentRowBoundary),
        RangeBound::Preceding(expr) | RangeBound::Following(expr) => {
            let sql = translate_expression(ctx, expr)?;
            Ok(FrameBoundary::BoundaryExpression(Box::new(sql)))
        }
    }
}