Skip to main content

icydb_core/db/sql/lowering/
mod.rs

1//! Module: db::sql::lowering
2//! Responsibility: reduced SQL statement lowering into canonical query intent.
3//! Does not own: SQL tokenization/parsing, planner validation policy, or executor semantics.
4//! Boundary: frontend-only translation from parsed SQL statement contracts to `Query<E>`.
5
6///
7/// TESTS
8///
9
10#[cfg(test)]
11mod tests;
12
13use crate::{
14    db::{
15        predicate::{CoercionId, CompareOp, MissingRowPolicy, Predicate},
16        query::{
17            builder::aggregate::{avg, count, count_by, max_by, min_by, sum},
18            intent::{Query, QueryError, StructuralQuery},
19            plan::{ExpressionOrderTerm, FieldSlot, resolve_aggregate_target_field_slot},
20        },
21        sql::identifier::{
22            identifier_last_segment, identifiers_tail_match, normalize_identifier_to_scope,
23            rewrite_field_identifiers,
24        },
25        sql::parser::{
26            SqlAggregateCall, SqlAggregateKind, SqlDeleteStatement, SqlExplainMode,
27            SqlExplainStatement, SqlExplainTarget, SqlHavingClause, SqlHavingSymbol,
28            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
29            SqlStatement, SqlTextFunctionCall,
30        },
31    },
32    model::{entity::EntityModel, field::FieldKind},
33    traits::EntityKind,
34    value::Value,
35};
36use thiserror::Error as ThisError;
37
38///
39/// LoweredSqlCommand
40///
41/// Generic-free SQL command shape after reduced SQL parsing and entity-route
42/// normalization.
43/// This keeps statement-shape lowering shared across entities before typed
44/// `Query<E>` binding happens at the execution boundary.
45///
46#[derive(Clone, Debug)]
47pub struct LoweredSqlCommand(LoweredSqlCommandInner);
48
49#[derive(Clone, Debug)]
50enum LoweredSqlCommandInner {
51    Query(LoweredSqlQuery),
52    Explain {
53        mode: SqlExplainMode,
54        query: LoweredSqlQuery,
55    },
56    ExplainGlobalAggregate {
57        mode: SqlExplainMode,
58        command: LoweredSqlGlobalAggregateCommand,
59    },
60    DescribeEntity,
61    ShowIndexesEntity,
62    ShowColumnsEntity,
63    ShowEntities,
64}
65
66///
67/// SqlCommand
68///
69/// Test-only typed SQL command shell over the shared lowered SQL surface.
70/// Runtime dispatch now consumes `LoweredSqlCommand` directly, but lowering
71/// tests still validate typed binding behavior on this local envelope.
72///
73#[cfg(test)]
74#[derive(Debug)]
75pub(crate) enum SqlCommand<E: EntityKind> {
76    Query(Query<E>),
77    Explain {
78        mode: SqlExplainMode,
79        query: Query<E>,
80    },
81    ExplainGlobalAggregate {
82        mode: SqlExplainMode,
83        command: SqlGlobalAggregateCommand<E>,
84    },
85    DescribeEntity,
86    ShowIndexesEntity,
87    ShowColumnsEntity,
88    ShowEntities,
89}
90
91impl LoweredSqlCommand {
92    #[must_use]
93    pub(in crate::db) const fn query(&self) -> Option<&LoweredSqlQuery> {
94        match &self.0 {
95            LoweredSqlCommandInner::Query(query) => Some(query),
96            LoweredSqlCommandInner::Explain { .. }
97            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
98            | LoweredSqlCommandInner::DescribeEntity
99            | LoweredSqlCommandInner::ShowIndexesEntity
100            | LoweredSqlCommandInner::ShowColumnsEntity
101            | LoweredSqlCommandInner::ShowEntities => None,
102        }
103    }
104
105    #[must_use]
106    pub(in crate::db) const fn explain_query(&self) -> Option<(SqlExplainMode, &LoweredSqlQuery)> {
107        match &self.0 {
108            LoweredSqlCommandInner::Explain { mode, query } => Some((*mode, query)),
109            LoweredSqlCommandInner::Query(_)
110            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
111            | LoweredSqlCommandInner::DescribeEntity
112            | LoweredSqlCommandInner::ShowIndexesEntity
113            | LoweredSqlCommandInner::ShowColumnsEntity
114            | LoweredSqlCommandInner::ShowEntities => None,
115        }
116    }
117}
118
119///
120/// LoweredSqlQuery
121///
122/// Generic-free executable SQL query shape prepared before typed query binding.
123/// Select and delete lowering stay shared until the final `Query<E>` build.
124///
125#[derive(Clone, Debug)]
126pub(crate) enum LoweredSqlQuery {
127    Select(LoweredSelectShape),
128    Delete(LoweredBaseQueryShape),
129}
130
131impl LoweredSqlQuery {
132    // Report whether this lowered query carries grouped execution semantics.
133    pub(crate) const fn has_grouping(&self) -> bool {
134        match self {
135            Self::Select(select) => select.has_grouping(),
136            Self::Delete(_) => false,
137        }
138    }
139}
140
141///
142/// SqlGlobalAggregateTerminal
143///
144/// Global SQL aggregate terminals currently executable through dedicated
145/// aggregate SQL entrypoints.
146///
147
148#[derive(Clone, Debug, Eq, PartialEq)]
149pub(crate) enum SqlGlobalAggregateTerminal {
150    CountRows,
151    CountField(String),
152    SumField(String),
153    AvgField(String),
154    MinField(String),
155    MaxField(String),
156}
157
158///
159/// TypedSqlGlobalAggregateTerminal
160///
161/// TypedSqlGlobalAggregateTerminal is the typed global aggregate contract used
162/// after entity binding resolves one concrete model.
163/// Field-target variants carry a resolved planner field slot so typed SQL
164/// aggregate execution does not re-resolve the same field name before dispatch.
165///
166#[derive(Clone, Debug, Eq, PartialEq)]
167pub(crate) enum TypedSqlGlobalAggregateTerminal {
168    CountRows,
169    CountField(FieldSlot),
170    SumField(FieldSlot),
171    AvgField(FieldSlot),
172    MinField(FieldSlot),
173    MaxField(FieldSlot),
174}
175
176///
177/// LoweredSqlGlobalAggregateCommand
178///
179/// Generic-free global aggregate command shape prepared before typed query
180/// binding.
181/// This keeps aggregate SQL lowering shared across entities until the final
182/// execution boundary converts the base query shape into `Query<E>`.
183///
184#[derive(Clone, Debug)]
185pub(crate) struct LoweredSqlGlobalAggregateCommand {
186    query: LoweredBaseQueryShape,
187    terminal: SqlGlobalAggregateTerminal,
188}
189
190///
191/// LoweredSqlAggregateShape
192///
193/// Locally validated aggregate-call shape used by SQL lowering to avoid
194/// duplicating `(SqlAggregateKind, field)` validation across lowering lanes.
195///
196
197enum LoweredSqlAggregateShape {
198    CountRows,
199    CountField(String),
200    FieldTarget {
201        kind: SqlAggregateKind,
202        field: String,
203    },
204}
205
206///
207/// SqlGlobalAggregateCommand
208///
209/// Lowered global SQL aggregate command carrying base query shape plus terminal.
210///
211
212#[derive(Debug)]
213pub(crate) struct SqlGlobalAggregateCommand<E: EntityKind> {
214    query: Query<E>,
215    terminal: TypedSqlGlobalAggregateTerminal,
216}
217
218impl<E: EntityKind> SqlGlobalAggregateCommand<E> {
219    /// Borrow the lowered base query shape for aggregate execution.
220    #[must_use]
221    pub(crate) const fn query(&self) -> &Query<E> {
222        &self.query
223    }
224
225    /// Borrow the lowered aggregate terminal.
226    #[must_use]
227    pub(crate) const fn terminal(&self) -> &TypedSqlGlobalAggregateTerminal {
228        &self.terminal
229    }
230}
231
232///
233/// SqlGlobalAggregateCommandCore
234///
235/// Generic-free lowered global aggregate command bound onto the structural
236/// query surface.
237/// This keeps global aggregate EXPLAIN on the shared query/explain path until
238/// a typed boundary is strictly required.
239///
240
241#[derive(Debug)]
242pub(crate) struct SqlGlobalAggregateCommandCore {
243    query: StructuralQuery,
244    terminal: SqlGlobalAggregateTerminal,
245}
246
247impl SqlGlobalAggregateCommandCore {
248    /// Borrow the structural query payload for aggregate explain/execution.
249    #[must_use]
250    pub(in crate::db) const fn query(&self) -> &StructuralQuery {
251        &self.query
252    }
253
254    /// Borrow the lowered aggregate terminal.
255    #[must_use]
256    pub(in crate::db) const fn terminal(&self) -> &SqlGlobalAggregateTerminal {
257        &self.terminal
258    }
259}
260
261///
262/// SqlLoweringError
263///
264/// SQL frontend lowering failures before planner validation/execution.
265///
266
267#[derive(Debug, ThisError)]
268pub(crate) enum SqlLoweringError {
269    #[error("{0}")]
270    Parse(#[from] crate::db::sql::parser::SqlParseError),
271
272    #[error("{0}")]
273    Query(#[from] QueryError),
274
275    #[error("SQL entity '{sql_entity}' does not match requested entity type '{expected_entity}'")]
276    EntityMismatch {
277        sql_entity: String,
278        expected_entity: &'static str,
279    },
280
281    #[error(
282        "unsupported SQL SELECT projection; supported forms are SELECT *, field lists, or grouped aggregate shapes"
283    )]
284    UnsupportedSelectProjection,
285
286    #[error("unsupported SQL SELECT DISTINCT")]
287    UnsupportedSelectDistinct,
288
289    #[error("unsupported SQL GROUP BY projection shape")]
290    UnsupportedSelectGroupBy,
291
292    #[error("unsupported SQL HAVING shape")]
293    UnsupportedSelectHaving,
294}
295
296impl SqlLoweringError {
297    /// Construct one entity-mismatch SQL lowering error.
298    fn entity_mismatch(sql_entity: impl Into<String>, expected_entity: &'static str) -> Self {
299        Self::EntityMismatch {
300            sql_entity: sql_entity.into(),
301            expected_entity,
302        }
303    }
304
305    /// Construct one unsupported SELECT projection SQL lowering error.
306    const fn unsupported_select_projection() -> Self {
307        Self::UnsupportedSelectProjection
308    }
309
310    /// Construct one unsupported SELECT DISTINCT SQL lowering error.
311    const fn unsupported_select_distinct() -> Self {
312        Self::UnsupportedSelectDistinct
313    }
314
315    /// Construct one unsupported SELECT GROUP BY shape SQL lowering error.
316    const fn unsupported_select_group_by() -> Self {
317        Self::UnsupportedSelectGroupBy
318    }
319
320    /// Construct one unsupported SELECT HAVING shape SQL lowering error.
321    const fn unsupported_select_having() -> Self {
322        Self::UnsupportedSelectHaving
323    }
324}
325
326///
327/// PreparedSqlStatement
328///
329/// SQL statement envelope after entity-scope normalization and
330/// entity-match validation for one target entity descriptor.
331///
332/// This pre-lowering contract is entity-agnostic and reusable across
333/// dynamic SQL route branches before typed `Query<E>` binding.
334///
335
336#[derive(Clone, Debug)]
337pub(crate) struct PreparedSqlStatement {
338    statement: SqlStatement,
339}
340
341#[derive(Clone, Copy, Debug, Eq, PartialEq)]
342pub(crate) enum LoweredSqlLaneKind {
343    Query,
344    Explain,
345    Describe,
346    ShowIndexes,
347    ShowColumns,
348    ShowEntities,
349}
350
351/// Parse and lower one SQL statement into canonical query intent for `E`.
352#[cfg(test)]
353pub(crate) fn compile_sql_command<E: EntityKind>(
354    sql: &str,
355    consistency: MissingRowPolicy,
356) -> Result<SqlCommand<E>, SqlLoweringError> {
357    let statement = crate::db::sql::parser::parse_sql(sql)?;
358    compile_sql_command_from_statement::<E>(statement, consistency)
359}
360
361/// Lower one parsed SQL statement into canonical query intent for `E`.
362#[cfg(test)]
363pub(crate) fn compile_sql_command_from_statement<E: EntityKind>(
364    statement: SqlStatement,
365    consistency: MissingRowPolicy,
366) -> Result<SqlCommand<E>, SqlLoweringError> {
367    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
368    compile_sql_command_from_prepared_statement::<E>(prepared, consistency)
369}
370
371/// Lower one prepared SQL statement into canonical query intent for `E`.
372#[cfg(test)]
373pub(crate) fn compile_sql_command_from_prepared_statement<E: EntityKind>(
374    prepared: PreparedSqlStatement,
375    consistency: MissingRowPolicy,
376) -> Result<SqlCommand<E>, SqlLoweringError> {
377    let lowered = lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)?;
378
379    bind_lowered_sql_command::<E>(lowered, consistency)
380}
381
382/// Lower one prepared SQL statement into one shared generic-free command shape.
383#[inline(never)]
384pub(crate) fn lower_sql_command_from_prepared_statement(
385    prepared: PreparedSqlStatement,
386    primary_key_field: &str,
387) -> Result<LoweredSqlCommand, SqlLoweringError> {
388    lower_prepared_statement(prepared.statement, primary_key_field)
389}
390
391pub(crate) const fn lowered_sql_command_lane(command: &LoweredSqlCommand) -> LoweredSqlLaneKind {
392    match command.0 {
393        LoweredSqlCommandInner::Query(_) => LoweredSqlLaneKind::Query,
394        LoweredSqlCommandInner::Explain { .. }
395        | LoweredSqlCommandInner::ExplainGlobalAggregate { .. } => LoweredSqlLaneKind::Explain,
396        LoweredSqlCommandInner::DescribeEntity => LoweredSqlLaneKind::Describe,
397        LoweredSqlCommandInner::ShowIndexesEntity => LoweredSqlLaneKind::ShowIndexes,
398        LoweredSqlCommandInner::ShowColumnsEntity => LoweredSqlLaneKind::ShowColumns,
399        LoweredSqlCommandInner::ShowEntities => LoweredSqlLaneKind::ShowEntities,
400    }
401}
402
403/// Return whether one parsed SQL statement is an executable constrained global
404/// aggregate shape owned by the dedicated aggregate lane.
405pub(in crate::db) fn is_sql_global_aggregate_statement(statement: &SqlStatement) -> bool {
406    let SqlStatement::Select(statement) = statement else {
407        return false;
408    };
409
410    is_sql_global_aggregate_select(statement)
411}
412
413// Detect one constrained global aggregate select shape without widening any
414// non-aggregate SQL surface onto the dedicated aggregate execution lane.
415fn is_sql_global_aggregate_select(statement: &SqlSelectStatement) -> bool {
416    if statement.distinct || !statement.group_by.is_empty() || !statement.having.is_empty() {
417        return false;
418    }
419
420    lower_global_aggregate_terminal(statement.projection.clone()).is_ok()
421}
422
423/// Bind one lowered global aggregate EXPLAIN shape onto the structural query
424/// surface when the explain command carries that specialized form.
425pub(crate) fn bind_lowered_sql_explain_global_aggregate_structural(
426    lowered: &LoweredSqlCommand,
427    model: &'static crate::model::entity::EntityModel,
428    consistency: MissingRowPolicy,
429) -> Option<(SqlExplainMode, SqlGlobalAggregateCommandCore)> {
430    let LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } = &lowered.0 else {
431        return None;
432    };
433
434    Some((
435        *mode,
436        bind_lowered_sql_global_aggregate_command_structural(model, command.clone(), consistency),
437    ))
438}
439
440/// Bind one shared generic-free SQL command shape to the typed query surface.
441#[cfg(test)]
442pub(crate) fn bind_lowered_sql_command<E: EntityKind>(
443    lowered: LoweredSqlCommand,
444    consistency: MissingRowPolicy,
445) -> Result<SqlCommand<E>, SqlLoweringError> {
446    match lowered.0 {
447        LoweredSqlCommandInner::Query(query) => Ok(SqlCommand::Query(bind_lowered_sql_query::<E>(
448            query,
449            consistency,
450        )?)),
451        LoweredSqlCommandInner::Explain { mode, query } => Ok(SqlCommand::Explain {
452            mode,
453            query: bind_lowered_sql_query::<E>(query, consistency)?,
454        }),
455        LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } => {
456            Ok(SqlCommand::ExplainGlobalAggregate {
457                mode,
458                command: bind_lowered_sql_global_aggregate_command::<E>(command, consistency)?,
459            })
460        }
461        LoweredSqlCommandInner::DescribeEntity => Ok(SqlCommand::DescribeEntity),
462        LoweredSqlCommandInner::ShowIndexesEntity => Ok(SqlCommand::ShowIndexesEntity),
463        LoweredSqlCommandInner::ShowColumnsEntity => Ok(SqlCommand::ShowColumnsEntity),
464        LoweredSqlCommandInner::ShowEntities => Ok(SqlCommand::ShowEntities),
465    }
466}
467
468/// Prepare one parsed SQL statement for one expected entity route.
469#[inline(never)]
470pub(crate) fn prepare_sql_statement(
471    statement: SqlStatement,
472    expected_entity: &'static str,
473) -> Result<PreparedSqlStatement, SqlLoweringError> {
474    let statement = prepare_statement(statement, expected_entity)?;
475
476    Ok(PreparedSqlStatement { statement })
477}
478
479/// Parse and lower one SQL statement into global aggregate execution command for `E`.
480#[cfg(test)]
481pub(crate) fn compile_sql_global_aggregate_command<E: EntityKind>(
482    sql: &str,
483    consistency: MissingRowPolicy,
484) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
485    let statement = crate::db::sql::parser::parse_sql(sql)?;
486    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
487    compile_sql_global_aggregate_command_from_prepared::<E>(prepared, consistency)
488}
489
490// Lower one already-prepared SQL statement into the constrained global
491// aggregate command envelope so callers that already parsed and routed the
492// statement do not pay the parser again.
493pub(crate) fn compile_sql_global_aggregate_command_from_prepared<E: EntityKind>(
494    prepared: PreparedSqlStatement,
495    consistency: MissingRowPolicy,
496) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
497    let SqlStatement::Select(statement) = prepared.statement else {
498        return Err(SqlLoweringError::unsupported_select_projection());
499    };
500
501    bind_lowered_sql_global_aggregate_command::<E>(
502        lower_global_aggregate_select_shape(statement)?,
503        consistency,
504    )
505}
506
507fn bind_lowered_sql_global_aggregate_terminal<E: EntityKind>(
508    terminal: SqlGlobalAggregateTerminal,
509) -> Result<TypedSqlGlobalAggregateTerminal, SqlLoweringError> {
510    let resolve_target_slot = |field: &str| {
511        resolve_aggregate_target_field_slot(E::MODEL, field).map_err(SqlLoweringError::from)
512    };
513
514    match terminal {
515        SqlGlobalAggregateTerminal::CountRows => Ok(TypedSqlGlobalAggregateTerminal::CountRows),
516        SqlGlobalAggregateTerminal::CountField(field) => Ok(
517            TypedSqlGlobalAggregateTerminal::CountField(resolve_target_slot(field.as_str())?),
518        ),
519        SqlGlobalAggregateTerminal::SumField(field) => Ok(
520            TypedSqlGlobalAggregateTerminal::SumField(resolve_target_slot(field.as_str())?),
521        ),
522        SqlGlobalAggregateTerminal::AvgField(field) => Ok(
523            TypedSqlGlobalAggregateTerminal::AvgField(resolve_target_slot(field.as_str())?),
524        ),
525        SqlGlobalAggregateTerminal::MinField(field) => Ok(
526            TypedSqlGlobalAggregateTerminal::MinField(resolve_target_slot(field.as_str())?),
527        ),
528        SqlGlobalAggregateTerminal::MaxField(field) => Ok(
529            TypedSqlGlobalAggregateTerminal::MaxField(resolve_target_slot(field.as_str())?),
530        ),
531    }
532}
533
534#[inline(never)]
535fn prepare_statement(
536    statement: SqlStatement,
537    expected_entity: &'static str,
538) -> Result<SqlStatement, SqlLoweringError> {
539    match statement {
540        SqlStatement::Select(statement) => Ok(SqlStatement::Select(prepare_select_statement(
541            statement,
542            expected_entity,
543        )?)),
544        SqlStatement::Delete(statement) => Ok(SqlStatement::Delete(prepare_delete_statement(
545            statement,
546            expected_entity,
547        )?)),
548        SqlStatement::Explain(statement) => Ok(SqlStatement::Explain(prepare_explain_statement(
549            statement,
550            expected_entity,
551        )?)),
552        SqlStatement::Describe(statement) => {
553            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
554
555            Ok(SqlStatement::Describe(statement))
556        }
557        SqlStatement::ShowIndexes(statement) => {
558            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
559
560            Ok(SqlStatement::ShowIndexes(statement))
561        }
562        SqlStatement::ShowColumns(statement) => {
563            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
564
565            Ok(SqlStatement::ShowColumns(statement))
566        }
567        SqlStatement::ShowEntities(statement) => Ok(SqlStatement::ShowEntities(statement)),
568    }
569}
570
571fn prepare_explain_statement(
572    statement: SqlExplainStatement,
573    expected_entity: &'static str,
574) -> Result<SqlExplainStatement, SqlLoweringError> {
575    let target = match statement.statement {
576        SqlExplainTarget::Select(select_statement) => {
577            SqlExplainTarget::Select(prepare_select_statement(select_statement, expected_entity)?)
578        }
579        SqlExplainTarget::Delete(delete_statement) => {
580            SqlExplainTarget::Delete(prepare_delete_statement(delete_statement, expected_entity)?)
581        }
582    };
583
584    Ok(SqlExplainStatement {
585        mode: statement.mode,
586        statement: target,
587    })
588}
589
590fn prepare_select_statement(
591    statement: SqlSelectStatement,
592    expected_entity: &'static str,
593) -> Result<SqlSelectStatement, SqlLoweringError> {
594    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
595
596    Ok(normalize_select_statement_to_expected_entity(
597        statement,
598        expected_entity,
599    ))
600}
601
602fn normalize_select_statement_to_expected_entity(
603    mut statement: SqlSelectStatement,
604    expected_entity: &'static str,
605) -> SqlSelectStatement {
606    // Re-scope parsed identifiers onto the resolved entity surface after the
607    // caller has already established entity ownership for this statement.
608    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
609    statement.projection =
610        normalize_projection_identifiers(statement.projection, entity_scope.as_slice());
611    statement.group_by = normalize_identifier_list(statement.group_by, entity_scope.as_slice());
612    statement.predicate = statement
613        .predicate
614        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
615    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
616    statement.having = normalize_having_clauses(statement.having, entity_scope.as_slice());
617
618    statement
619}
620
621fn prepare_delete_statement(
622    mut statement: SqlDeleteStatement,
623    expected_entity: &'static str,
624) -> Result<SqlDeleteStatement, SqlLoweringError> {
625    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
626    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
627    statement.predicate = statement
628        .predicate
629        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
630    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
631
632    Ok(statement)
633}
634
635#[inline(never)]
636fn lower_prepared_statement(
637    statement: SqlStatement,
638    primary_key_field: &str,
639) -> Result<LoweredSqlCommand, SqlLoweringError> {
640    match statement {
641        SqlStatement::Select(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
642            LoweredSqlQuery::Select(lower_select_shape(statement, primary_key_field)?),
643        ))),
644        SqlStatement::Delete(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
645            LoweredSqlQuery::Delete(lower_delete_shape(statement)),
646        ))),
647        SqlStatement::Explain(statement) => lower_explain_prepared(statement, primary_key_field),
648        SqlStatement::Describe(_) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::DescribeEntity)),
649        SqlStatement::ShowIndexes(_) => {
650            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowIndexesEntity))
651        }
652        SqlStatement::ShowColumns(_) => {
653            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowColumnsEntity))
654        }
655        SqlStatement::ShowEntities(_) => {
656            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowEntities))
657        }
658    }
659}
660
661fn lower_explain_prepared(
662    statement: SqlExplainStatement,
663    primary_key_field: &str,
664) -> Result<LoweredSqlCommand, SqlLoweringError> {
665    let mode = statement.mode;
666
667    match statement.statement {
668        SqlExplainTarget::Select(select_statement) => {
669            lower_explain_select_prepared(select_statement, mode, primary_key_field)
670        }
671        SqlExplainTarget::Delete(delete_statement) => {
672            Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
673                mode,
674                query: LoweredSqlQuery::Delete(lower_delete_shape(delete_statement)),
675            }))
676        }
677    }
678}
679
680fn lower_explain_select_prepared(
681    statement: SqlSelectStatement,
682    mode: SqlExplainMode,
683    primary_key_field: &str,
684) -> Result<LoweredSqlCommand, SqlLoweringError> {
685    match lower_select_shape(statement.clone(), primary_key_field) {
686        Ok(query) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
687            mode,
688            query: LoweredSqlQuery::Select(query),
689        })),
690        Err(SqlLoweringError::UnsupportedSelectProjection) => {
691            let command = lower_global_aggregate_select_shape(statement)?;
692
693            Ok(LoweredSqlCommand(
694                LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command },
695            ))
696        }
697        Err(err) => Err(err),
698    }
699}
700
701fn lower_global_aggregate_select_shape(
702    statement: SqlSelectStatement,
703) -> Result<LoweredSqlGlobalAggregateCommand, SqlLoweringError> {
704    let SqlSelectStatement {
705        projection,
706        predicate,
707        distinct,
708        group_by,
709        having,
710        order_by,
711        limit,
712        offset,
713        entity: _,
714    } = statement;
715
716    if distinct {
717        return Err(SqlLoweringError::unsupported_select_distinct());
718    }
719    if !group_by.is_empty() {
720        return Err(SqlLoweringError::unsupported_select_group_by());
721    }
722    if !having.is_empty() {
723        return Err(SqlLoweringError::unsupported_select_having());
724    }
725
726    let terminal = lower_global_aggregate_terminal(projection)?;
727
728    Ok(LoweredSqlGlobalAggregateCommand {
729        query: LoweredBaseQueryShape {
730            predicate,
731            order_by,
732            limit,
733            offset,
734        },
735        terminal,
736    })
737}
738
739///
740/// ResolvedHavingClause
741///
742/// Pre-resolved HAVING clause shape after SQL projection aggregate index
743/// resolution. This keeps SQL shape analysis entity-agnostic before typed
744/// query binding.
745///
746#[derive(Clone, Debug)]
747enum ResolvedHavingClause {
748    GroupField {
749        field: String,
750        op: crate::db::predicate::CompareOp,
751        value: crate::value::Value,
752    },
753    Aggregate {
754        aggregate_index: usize,
755        op: crate::db::predicate::CompareOp,
756        value: crate::value::Value,
757    },
758}
759
760///
761/// LoweredSelectShape
762///
763/// Entity-agnostic lowered SQL SELECT shape prepared for typed `Query<E>`
764/// binding.
765///
766#[derive(Clone, Debug)]
767pub(crate) struct LoweredSelectShape {
768    scalar_projection_fields: Option<Vec<String>>,
769    grouped_projection_aggregates: Vec<SqlAggregateCall>,
770    group_by_fields: Vec<String>,
771    distinct: bool,
772    having: Vec<ResolvedHavingClause>,
773    predicate: Option<Predicate>,
774    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
775    limit: Option<u32>,
776    offset: Option<u32>,
777}
778
779impl LoweredSelectShape {
780    // Report whether this lowered select shape carries grouped execution state.
781    const fn has_grouping(&self) -> bool {
782        !self.group_by_fields.is_empty()
783    }
784}
785
786///
787/// LoweredBaseQueryShape
788///
789/// Generic-free filter/order/window query modifiers shared by delete and
790/// global-aggregate SQL lowering.
791/// This keeps common SQL query-shape lowering shared before typed query
792/// binding.
793///
794#[derive(Clone, Debug)]
795pub(crate) struct LoweredBaseQueryShape {
796    predicate: Option<Predicate>,
797    order_by: Vec<SqlOrderTerm>,
798    limit: Option<u32>,
799    offset: Option<u32>,
800}
801
802#[inline(never)]
803fn lower_select_shape(
804    statement: SqlSelectStatement,
805    primary_key_field: &str,
806) -> Result<LoweredSelectShape, SqlLoweringError> {
807    let SqlSelectStatement {
808        projection,
809        predicate,
810        distinct,
811        group_by,
812        having,
813        order_by,
814        limit,
815        offset,
816        entity: _,
817    } = statement;
818    let projection_for_having = projection.clone();
819
820    // Phase 1: resolve scalar/grouped projection shape.
821    let (scalar_projection_fields, grouped_projection_aggregates) = if group_by.is_empty() {
822        let scalar_projection_fields =
823            lower_scalar_projection_fields(projection, distinct, primary_key_field)?;
824        (scalar_projection_fields, Vec::new())
825    } else {
826        if distinct {
827            return Err(SqlLoweringError::unsupported_select_distinct());
828        }
829        let grouped_projection_aggregates =
830            grouped_projection_aggregate_calls(&projection, group_by.as_slice())?;
831        (None, grouped_projection_aggregates)
832    };
833
834    // Phase 2: resolve HAVING symbols against grouped projection authority.
835    let having = lower_having_clauses(
836        having,
837        &projection_for_having,
838        group_by.as_slice(),
839        grouped_projection_aggregates.as_slice(),
840    )?;
841
842    Ok(LoweredSelectShape {
843        scalar_projection_fields,
844        grouped_projection_aggregates,
845        group_by_fields: group_by,
846        distinct,
847        having,
848        predicate,
849        order_by,
850        limit,
851        offset,
852    })
853}
854
855fn lower_scalar_projection_fields(
856    projection: SqlProjection,
857    distinct: bool,
858    primary_key_field: &str,
859) -> Result<Option<Vec<String>>, SqlLoweringError> {
860    let SqlProjection::Items(items) = projection else {
861        if distinct {
862            return Ok(None);
863        }
864
865        return Ok(None);
866    };
867
868    let has_aggregate = items
869        .iter()
870        .any(|item| matches!(item, SqlSelectItem::Aggregate(_)));
871    if has_aggregate {
872        return Err(SqlLoweringError::unsupported_select_projection());
873    }
874
875    let fields = items
876        .into_iter()
877        .map(|item| match item {
878            SqlSelectItem::Field(field) => Ok(field),
879            SqlSelectItem::Aggregate(_) | SqlSelectItem::TextFunction(_) => {
880                Err(SqlLoweringError::unsupported_select_projection())
881            }
882        })
883        .collect::<Result<Vec<_>, _>>()?;
884
885    validate_scalar_distinct_projection(distinct, fields.as_slice(), primary_key_field)?;
886
887    Ok(Some(fields))
888}
889
890fn validate_scalar_distinct_projection(
891    distinct: bool,
892    projection_fields: &[String],
893    primary_key_field: &str,
894) -> Result<(), SqlLoweringError> {
895    if !distinct {
896        return Ok(());
897    }
898
899    if projection_fields.is_empty() {
900        return Ok(());
901    }
902
903    let has_primary_key_field = projection_fields
904        .iter()
905        .any(|field| field == primary_key_field);
906    if !has_primary_key_field {
907        return Err(SqlLoweringError::unsupported_select_distinct());
908    }
909
910    Ok(())
911}
912
913fn lower_having_clauses(
914    having_clauses: Vec<SqlHavingClause>,
915    projection: &SqlProjection,
916    group_by_fields: &[String],
917    grouped_projection_aggregates: &[SqlAggregateCall],
918) -> Result<Vec<ResolvedHavingClause>, SqlLoweringError> {
919    if having_clauses.is_empty() {
920        return Ok(Vec::new());
921    }
922    if group_by_fields.is_empty() {
923        return Err(SqlLoweringError::unsupported_select_having());
924    }
925
926    let projection_aggregates = grouped_projection_aggregate_calls(projection, group_by_fields)
927        .map_err(|_| SqlLoweringError::unsupported_select_having())?;
928    if projection_aggregates.as_slice() != grouped_projection_aggregates {
929        return Err(SqlLoweringError::unsupported_select_having());
930    }
931
932    let mut lowered = Vec::with_capacity(having_clauses.len());
933    for clause in having_clauses {
934        match clause.symbol {
935            SqlHavingSymbol::Field(field) => lowered.push(ResolvedHavingClause::GroupField {
936                field,
937                op: clause.op,
938                value: clause.value,
939            }),
940            SqlHavingSymbol::Aggregate(aggregate) => {
941                let aggregate_index =
942                    resolve_having_aggregate_index(&aggregate, grouped_projection_aggregates)?;
943                lowered.push(ResolvedHavingClause::Aggregate {
944                    aggregate_index,
945                    op: clause.op,
946                    value: clause.value,
947                });
948            }
949        }
950    }
951
952    Ok(lowered)
953}
954
955// Canonicalize strict numeric SQL predicate literals onto the resolved model
956// field kind so unsigned-width fields keep strict/indexable semantics even
957// though reduced SQL integer tokens parse through one generic numeric value
958// variant first.
959fn canonicalize_sql_predicate_for_model(
960    model: &'static EntityModel,
961    predicate: Predicate,
962) -> Predicate {
963    match predicate {
964        Predicate::And(children) => Predicate::And(
965            children
966                .into_iter()
967                .map(|child| canonicalize_sql_predicate_for_model(model, child))
968                .collect(),
969        ),
970        Predicate::Or(children) => Predicate::Or(
971            children
972                .into_iter()
973                .map(|child| canonicalize_sql_predicate_for_model(model, child))
974                .collect(),
975        ),
976        Predicate::Not(inner) => Predicate::Not(Box::new(canonicalize_sql_predicate_for_model(
977            model, *inner,
978        ))),
979        Predicate::Compare(mut cmp) => {
980            canonicalize_sql_compare_for_model(model, &mut cmp);
981            Predicate::Compare(cmp)
982        }
983        Predicate::True
984        | Predicate::False
985        | Predicate::IsNull { .. }
986        | Predicate::IsNotNull { .. }
987        | Predicate::IsMissing { .. }
988        | Predicate::IsEmpty { .. }
989        | Predicate::IsNotEmpty { .. }
990        | Predicate::TextContains { .. }
991        | Predicate::TextContainsCi { .. } => predicate,
992    }
993}
994
995// Resolve one lowered predicate field onto the runtime model kind that owns
996// its strict literal compatibility rules.
997fn model_field_kind(model: &'static EntityModel, field: &str) -> Option<FieldKind> {
998    model
999        .fields()
1000        .iter()
1001        .find(|candidate| candidate.name() == field)
1002        .map(crate::model::field::FieldModel::kind)
1003}
1004
1005// Keep SQL-only literal widening narrow:
1006// - only strict equality-style numeric predicates are eligible
1007// - ordering already uses `NumericWiden`
1008// - text and expression-wrapped predicates stay untouched
1009fn canonicalize_sql_compare_for_model(
1010    model: &'static EntityModel,
1011    cmp: &mut crate::db::predicate::ComparePredicate,
1012) {
1013    if cmp.coercion.id != CoercionId::Strict {
1014        return;
1015    }
1016
1017    let Some(field_kind) = model_field_kind(model, &cmp.field) else {
1018        return;
1019    };
1020
1021    match cmp.op {
1022        CompareOp::Eq | CompareOp::Ne => {
1023            if let Some(value) =
1024                canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &cmp.value)
1025            {
1026                cmp.value = value;
1027            }
1028        }
1029        CompareOp::In | CompareOp::NotIn => {
1030            let Value::List(items) = &cmp.value else {
1031                return;
1032            };
1033
1034            let items = items
1035                .iter()
1036                .map(|item| {
1037                    canonicalize_strict_sql_numeric_value_for_kind(&field_kind, item)
1038                        .unwrap_or_else(|| item.clone())
1039                })
1040                .collect();
1041            cmp.value = Value::List(items);
1042        }
1043        CompareOp::Lt
1044        | CompareOp::Lte
1045        | CompareOp::Gt
1046        | CompareOp::Gte
1047        | CompareOp::Contains
1048        | CompareOp::StartsWith
1049        | CompareOp::EndsWith => {}
1050    }
1051}
1052
1053// Convert one parsed SQL numeric literal into the exact runtime `Value` variant
1054// required by the field kind when that conversion is lossless and unambiguous.
1055// This preserves strict equality semantics while still letting SQL express
1056// unsigned-width comparisons such as `Nat16`/`u64` fields.
1057fn canonicalize_strict_sql_numeric_value_for_kind(
1058    kind: &FieldKind,
1059    value: &Value,
1060) -> Option<Value> {
1061    match kind {
1062        FieldKind::Relation { key_kind, .. } => {
1063            canonicalize_strict_sql_numeric_value_for_kind(key_kind, value)
1064        }
1065        FieldKind::Int => match value {
1066            Value::Int(inner) => Some(Value::Int(*inner)),
1067            Value::Uint(inner) => i64::try_from(*inner).ok().map(Value::Int),
1068            _ => None,
1069        },
1070        FieldKind::Uint => match value {
1071            Value::Int(inner) => u64::try_from(*inner).ok().map(Value::Uint),
1072            Value::Uint(inner) => Some(Value::Uint(*inner)),
1073            _ => None,
1074        },
1075        FieldKind::Account
1076        | FieldKind::Blob
1077        | FieldKind::Bool
1078        | FieldKind::Date
1079        | FieldKind::Decimal { .. }
1080        | FieldKind::Duration
1081        | FieldKind::Enum { .. }
1082        | FieldKind::Float32
1083        | FieldKind::Float64
1084        | FieldKind::Int128
1085        | FieldKind::IntBig
1086        | FieldKind::List(_)
1087        | FieldKind::Map { .. }
1088        | FieldKind::Principal
1089        | FieldKind::Set(_)
1090        | FieldKind::Structured { .. }
1091        | FieldKind::Subaccount
1092        | FieldKind::Text
1093        | FieldKind::Timestamp
1094        | FieldKind::Uint128
1095        | FieldKind::UintBig
1096        | FieldKind::Ulid
1097        | FieldKind::Unit => None,
1098    }
1099}
1100
1101#[inline(never)]
1102pub(in crate::db) fn apply_lowered_select_shape(
1103    mut query: StructuralQuery,
1104    lowered: LoweredSelectShape,
1105) -> Result<StructuralQuery, SqlLoweringError> {
1106    let LoweredSelectShape {
1107        scalar_projection_fields,
1108        grouped_projection_aggregates,
1109        group_by_fields,
1110        distinct,
1111        having,
1112        predicate,
1113        order_by,
1114        limit,
1115        offset,
1116    } = lowered;
1117    let model = query.model();
1118
1119    // Phase 1: apply grouped declaration semantics.
1120    for field in group_by_fields {
1121        query = query.group_by(field)?;
1122    }
1123
1124    // Phase 2: apply scalar DISTINCT and projection contracts.
1125    if distinct {
1126        query = query.distinct();
1127    }
1128    if let Some(fields) = scalar_projection_fields {
1129        query = query.select_fields(fields);
1130    }
1131    for aggregate in grouped_projection_aggregates {
1132        query = query.aggregate(lower_aggregate_call(aggregate)?);
1133    }
1134
1135    // Phase 3: bind resolved HAVING clauses against grouped terminals.
1136    for clause in having {
1137        match clause {
1138            ResolvedHavingClause::GroupField { field, op, value } => {
1139                let value = model_field_kind(model, &field)
1140                    .and_then(|field_kind| {
1141                        canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &value)
1142                    })
1143                    .unwrap_or(value);
1144                query = query.having_group(field, op, value)?;
1145            }
1146            ResolvedHavingClause::Aggregate {
1147                aggregate_index,
1148                op,
1149                value,
1150            } => {
1151                query = query.having_aggregate(aggregate_index, op, value)?;
1152            }
1153        }
1154    }
1155
1156    // Phase 4: attach the shared filter/order/page tail through the base-query lane.
1157    Ok(apply_lowered_base_query_shape(
1158        query,
1159        LoweredBaseQueryShape {
1160            predicate: predicate
1161                .map(|predicate| canonicalize_sql_predicate_for_model(model, predicate)),
1162            order_by,
1163            limit,
1164            offset,
1165        },
1166    ))
1167}
1168
1169fn apply_lowered_base_query_shape(
1170    mut query: StructuralQuery,
1171    lowered: LoweredBaseQueryShape,
1172) -> StructuralQuery {
1173    if let Some(predicate) = lowered.predicate {
1174        query = query.filter(predicate);
1175    }
1176    query = apply_order_terms_structural(query, lowered.order_by);
1177    if let Some(limit) = lowered.limit {
1178        query = query.limit(limit);
1179    }
1180    if let Some(offset) = lowered.offset {
1181        query = query.offset(offset);
1182    }
1183
1184    query
1185}
1186
1187pub(in crate::db) fn bind_lowered_sql_query_structural(
1188    model: &'static crate::model::entity::EntityModel,
1189    lowered: LoweredSqlQuery,
1190    consistency: MissingRowPolicy,
1191) -> Result<StructuralQuery, SqlLoweringError> {
1192    match lowered {
1193        LoweredSqlQuery::Select(select) => {
1194            apply_lowered_select_shape(StructuralQuery::new(model, consistency), select)
1195        }
1196        LoweredSqlQuery::Delete(delete) => Ok(bind_lowered_sql_delete_query_structural(
1197            model,
1198            delete,
1199            consistency,
1200        )),
1201    }
1202}
1203
1204pub(in crate::db) fn bind_lowered_sql_delete_query_structural(
1205    model: &'static crate::model::entity::EntityModel,
1206    delete: LoweredBaseQueryShape,
1207    consistency: MissingRowPolicy,
1208) -> StructuralQuery {
1209    apply_lowered_base_query_shape(StructuralQuery::new(model, consistency).delete(), delete)
1210}
1211
1212pub(in crate::db) fn bind_lowered_sql_query<E: EntityKind>(
1213    lowered: LoweredSqlQuery,
1214    consistency: MissingRowPolicy,
1215) -> Result<Query<E>, SqlLoweringError> {
1216    let structural = bind_lowered_sql_query_structural(E::MODEL, lowered, consistency)?;
1217
1218    Ok(Query::from_inner(structural))
1219}
1220
1221fn bind_lowered_sql_global_aggregate_command<E: EntityKind>(
1222    lowered: LoweredSqlGlobalAggregateCommand,
1223    consistency: MissingRowPolicy,
1224) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
1225    let terminal = bind_lowered_sql_global_aggregate_terminal::<E>(lowered.terminal)?;
1226
1227    Ok(SqlGlobalAggregateCommand {
1228        query: Query::from_inner(apply_lowered_base_query_shape(
1229            StructuralQuery::new(E::MODEL, consistency),
1230            lowered.query,
1231        )),
1232        terminal,
1233    })
1234}
1235
1236fn bind_lowered_sql_global_aggregate_command_structural(
1237    model: &'static crate::model::entity::EntityModel,
1238    lowered: LoweredSqlGlobalAggregateCommand,
1239    consistency: MissingRowPolicy,
1240) -> SqlGlobalAggregateCommandCore {
1241    SqlGlobalAggregateCommandCore {
1242        query: apply_lowered_base_query_shape(
1243            StructuralQuery::new(model, consistency),
1244            lowered.query,
1245        ),
1246        terminal: lowered.terminal,
1247    }
1248}
1249
1250fn lower_global_aggregate_terminal(
1251    projection: SqlProjection,
1252) -> Result<SqlGlobalAggregateTerminal, SqlLoweringError> {
1253    let SqlProjection::Items(items) = projection else {
1254        return Err(SqlLoweringError::unsupported_select_projection());
1255    };
1256    if items.len() != 1 {
1257        return Err(SqlLoweringError::unsupported_select_projection());
1258    }
1259
1260    let Some(SqlSelectItem::Aggregate(aggregate)) = items.into_iter().next() else {
1261        return Err(SqlLoweringError::unsupported_select_projection());
1262    };
1263
1264    match lower_sql_aggregate_shape(aggregate)? {
1265        LoweredSqlAggregateShape::CountRows => Ok(SqlGlobalAggregateTerminal::CountRows),
1266        LoweredSqlAggregateShape::CountField(field) => {
1267            Ok(SqlGlobalAggregateTerminal::CountField(field))
1268        }
1269        LoweredSqlAggregateShape::FieldTarget {
1270            kind: SqlAggregateKind::Sum,
1271            field,
1272        } => Ok(SqlGlobalAggregateTerminal::SumField(field)),
1273        LoweredSqlAggregateShape::FieldTarget {
1274            kind: SqlAggregateKind::Avg,
1275            field,
1276        } => Ok(SqlGlobalAggregateTerminal::AvgField(field)),
1277        LoweredSqlAggregateShape::FieldTarget {
1278            kind: SqlAggregateKind::Min,
1279            field,
1280        } => Ok(SqlGlobalAggregateTerminal::MinField(field)),
1281        LoweredSqlAggregateShape::FieldTarget {
1282            kind: SqlAggregateKind::Max,
1283            field,
1284        } => Ok(SqlGlobalAggregateTerminal::MaxField(field)),
1285        LoweredSqlAggregateShape::FieldTarget {
1286            kind: SqlAggregateKind::Count,
1287            ..
1288        } => Err(SqlLoweringError::unsupported_select_projection()),
1289    }
1290}
1291
1292fn lower_sql_aggregate_shape(
1293    call: SqlAggregateCall,
1294) -> Result<LoweredSqlAggregateShape, SqlLoweringError> {
1295    match (call.kind, call.field) {
1296        (SqlAggregateKind::Count, None) => Ok(LoweredSqlAggregateShape::CountRows),
1297        (SqlAggregateKind::Count, Some(field)) => Ok(LoweredSqlAggregateShape::CountField(field)),
1298        (
1299            kind @ (SqlAggregateKind::Sum
1300            | SqlAggregateKind::Avg
1301            | SqlAggregateKind::Min
1302            | SqlAggregateKind::Max),
1303            Some(field),
1304        ) => Ok(LoweredSqlAggregateShape::FieldTarget { kind, field }),
1305        _ => Err(SqlLoweringError::unsupported_select_projection()),
1306    }
1307}
1308
1309fn grouped_projection_aggregate_calls(
1310    projection: &SqlProjection,
1311    group_by_fields: &[String],
1312) -> Result<Vec<SqlAggregateCall>, SqlLoweringError> {
1313    if group_by_fields.is_empty() {
1314        return Err(SqlLoweringError::unsupported_select_group_by());
1315    }
1316
1317    let SqlProjection::Items(items) = projection else {
1318        return Err(SqlLoweringError::unsupported_select_group_by());
1319    };
1320
1321    let mut projected_group_fields = Vec::<String>::new();
1322    let mut aggregate_calls = Vec::<SqlAggregateCall>::new();
1323    let mut seen_aggregate = false;
1324
1325    for item in items {
1326        match item {
1327            SqlSelectItem::Field(field) => {
1328                // Keep grouped projection deterministic and mappable to grouped
1329                // response contracts: group keys must be declared first.
1330                if seen_aggregate {
1331                    return Err(SqlLoweringError::unsupported_select_group_by());
1332                }
1333                projected_group_fields.push(field.clone());
1334            }
1335            SqlSelectItem::Aggregate(aggregate) => {
1336                seen_aggregate = true;
1337                aggregate_calls.push(aggregate.clone());
1338            }
1339            SqlSelectItem::TextFunction(_) => {
1340                return Err(SqlLoweringError::unsupported_select_group_by());
1341            }
1342        }
1343    }
1344
1345    if aggregate_calls.is_empty() || projected_group_fields.as_slice() != group_by_fields {
1346        return Err(SqlLoweringError::unsupported_select_group_by());
1347    }
1348
1349    Ok(aggregate_calls)
1350}
1351
1352fn lower_aggregate_call(
1353    call: SqlAggregateCall,
1354) -> Result<crate::db::query::builder::AggregateExpr, SqlLoweringError> {
1355    match lower_sql_aggregate_shape(call)? {
1356        LoweredSqlAggregateShape::CountRows => Ok(count()),
1357        LoweredSqlAggregateShape::CountField(field) => Ok(count_by(field)),
1358        LoweredSqlAggregateShape::FieldTarget {
1359            kind: SqlAggregateKind::Sum,
1360            field,
1361        } => Ok(sum(field)),
1362        LoweredSqlAggregateShape::FieldTarget {
1363            kind: SqlAggregateKind::Avg,
1364            field,
1365        } => Ok(avg(field)),
1366        LoweredSqlAggregateShape::FieldTarget {
1367            kind: SqlAggregateKind::Min,
1368            field,
1369        } => Ok(min_by(field)),
1370        LoweredSqlAggregateShape::FieldTarget {
1371            kind: SqlAggregateKind::Max,
1372            field,
1373        } => Ok(max_by(field)),
1374        LoweredSqlAggregateShape::FieldTarget {
1375            kind: SqlAggregateKind::Count,
1376            ..
1377        } => Err(SqlLoweringError::unsupported_select_projection()),
1378    }
1379}
1380
1381fn resolve_having_aggregate_index(
1382    target: &SqlAggregateCall,
1383    grouped_projection_aggregates: &[SqlAggregateCall],
1384) -> Result<usize, SqlLoweringError> {
1385    let mut matched = grouped_projection_aggregates
1386        .iter()
1387        .enumerate()
1388        .filter_map(|(index, aggregate)| (aggregate == target).then_some(index));
1389    let Some(index) = matched.next() else {
1390        return Err(SqlLoweringError::unsupported_select_having());
1391    };
1392    if matched.next().is_some() {
1393        return Err(SqlLoweringError::unsupported_select_having());
1394    }
1395
1396    Ok(index)
1397}
1398
1399fn lower_delete_shape(statement: SqlDeleteStatement) -> LoweredBaseQueryShape {
1400    let SqlDeleteStatement {
1401        predicate,
1402        order_by,
1403        limit,
1404        entity: _,
1405    } = statement;
1406
1407    LoweredBaseQueryShape {
1408        predicate,
1409        order_by,
1410        limit,
1411        offset: None,
1412    }
1413}
1414
1415fn apply_order_terms_structural(
1416    mut query: StructuralQuery,
1417    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1418) -> StructuralQuery {
1419    for term in order_by {
1420        query = match term.direction {
1421            SqlOrderDirection::Asc => query.order_by(term.field),
1422            SqlOrderDirection::Desc => query.order_by_desc(term.field),
1423        };
1424    }
1425
1426    query
1427}
1428
1429fn normalize_having_clauses(
1430    clauses: Vec<SqlHavingClause>,
1431    entity_scope: &[String],
1432) -> Vec<SqlHavingClause> {
1433    clauses
1434        .into_iter()
1435        .map(|clause| SqlHavingClause {
1436            symbol: normalize_having_symbol(clause.symbol, entity_scope),
1437            op: clause.op,
1438            value: clause.value,
1439        })
1440        .collect()
1441}
1442
1443fn normalize_having_symbol(symbol: SqlHavingSymbol, entity_scope: &[String]) -> SqlHavingSymbol {
1444    match symbol {
1445        SqlHavingSymbol::Field(field) => {
1446            SqlHavingSymbol::Field(normalize_identifier_to_scope(field, entity_scope))
1447        }
1448        SqlHavingSymbol::Aggregate(aggregate) => SqlHavingSymbol::Aggregate(
1449            normalize_aggregate_call_identifiers(aggregate, entity_scope),
1450        ),
1451    }
1452}
1453
1454fn normalize_aggregate_call_identifiers(
1455    aggregate: SqlAggregateCall,
1456    entity_scope: &[String],
1457) -> SqlAggregateCall {
1458    SqlAggregateCall {
1459        kind: aggregate.kind,
1460        field: aggregate
1461            .field
1462            .map(|field| normalize_identifier_to_scope(field, entity_scope)),
1463    }
1464}
1465
1466// Build one identifier scope used for reducing SQL-qualified field references
1467// (`entity.field`, `schema.entity.field`) into canonical planner field names.
1468fn sql_entity_scope_candidates(sql_entity: &str, expected_entity: &'static str) -> Vec<String> {
1469    let mut out = Vec::new();
1470    out.push(sql_entity.to_string());
1471    out.push(expected_entity.to_string());
1472
1473    if let Some(last) = identifier_last_segment(sql_entity) {
1474        out.push(last.to_string());
1475    }
1476    if let Some(last) = identifier_last_segment(expected_entity) {
1477        out.push(last.to_string());
1478    }
1479
1480    out
1481}
1482
1483fn normalize_projection_identifiers(
1484    projection: SqlProjection,
1485    entity_scope: &[String],
1486) -> SqlProjection {
1487    match projection {
1488        SqlProjection::All => SqlProjection::All,
1489        SqlProjection::Items(items) => SqlProjection::Items(
1490            items
1491                .into_iter()
1492                .map(|item| match item {
1493                    SqlSelectItem::Field(field) => {
1494                        SqlSelectItem::Field(normalize_identifier(field, entity_scope))
1495                    }
1496                    SqlSelectItem::Aggregate(aggregate) => {
1497                        SqlSelectItem::Aggregate(SqlAggregateCall {
1498                            kind: aggregate.kind,
1499                            field: aggregate
1500                                .field
1501                                .map(|field| normalize_identifier(field, entity_scope)),
1502                        })
1503                    }
1504                    SqlSelectItem::TextFunction(SqlTextFunctionCall {
1505                        function,
1506                        field,
1507                        literal,
1508                        literal2,
1509                        literal3,
1510                    }) => SqlSelectItem::TextFunction(SqlTextFunctionCall {
1511                        function,
1512                        field: normalize_identifier(field, entity_scope),
1513                        literal,
1514                        literal2,
1515                        literal3,
1516                    }),
1517                })
1518                .collect(),
1519        ),
1520    }
1521}
1522
1523fn normalize_order_terms(
1524    terms: Vec<crate::db::sql::parser::SqlOrderTerm>,
1525    entity_scope: &[String],
1526) -> Vec<crate::db::sql::parser::SqlOrderTerm> {
1527    terms
1528        .into_iter()
1529        .map(|term| crate::db::sql::parser::SqlOrderTerm {
1530            field: normalize_order_term_identifier(term.field, entity_scope),
1531            direction: term.direction,
1532        })
1533        .collect()
1534}
1535
1536fn normalize_order_term_identifier(identifier: String, entity_scope: &[String]) -> String {
1537    let Some(expression) = ExpressionOrderTerm::parse(identifier.as_str()) else {
1538        return normalize_identifier(identifier, entity_scope);
1539    };
1540    let normalized_field = normalize_identifier(expression.field().to_string(), entity_scope);
1541
1542    expression.canonical_text_with_field(normalized_field.as_str())
1543}
1544
1545fn normalize_identifier_list(fields: Vec<String>, entity_scope: &[String]) -> Vec<String> {
1546    fields
1547        .into_iter()
1548        .map(|field| normalize_identifier(field, entity_scope))
1549        .collect()
1550}
1551
1552// SQL lowering only adapts identifier qualification (`entity.field` -> `field`)
1553// and delegates predicate-tree traversal ownership to `db::predicate`.
1554fn adapt_predicate_identifiers_to_scope(
1555    predicate: Predicate,
1556    entity_scope: &[String],
1557) -> Predicate {
1558    rewrite_field_identifiers(predicate, |field| normalize_identifier(field, entity_scope))
1559}
1560
1561fn normalize_identifier(identifier: String, entity_scope: &[String]) -> String {
1562    normalize_identifier_to_scope(identifier, entity_scope)
1563}
1564
1565fn ensure_entity_matches_expected(
1566    sql_entity: &str,
1567    expected_entity: &'static str,
1568) -> Result<(), SqlLoweringError> {
1569    if identifiers_tail_match(sql_entity, expected_entity) {
1570        return Ok(());
1571    }
1572
1573    Err(SqlLoweringError::entity_mismatch(
1574        sql_entity,
1575        expected_entity,
1576    ))
1577}