Skip to main content

icydb_core/db/sql/lowering/
mod.rs

1//! Module: db::sql::lowering
2//! Responsibility: reduced SQL statement lowering into canonical query intent.
3//! Does not own: SQL tokenization/parsing, planner validation policy, or executor semantics.
4//! Boundary: frontend-only translation from parsed SQL statement contracts to `Query<E>`.
5
6///
7/// TESTS
8///
9
10#[cfg(test)]
11mod tests;
12
13use crate::{
14    db::{
15        predicate::{CoercionId, CompareOp, MissingRowPolicy, Predicate},
16        query::{
17            builder::aggregate::{avg, count, count_by, max_by, min_by, sum},
18            intent::{Query, QueryError, StructuralQuery},
19            plan::{
20                AggregateKind, ExpressionOrderTerm, FieldSlot, resolve_aggregate_target_field_slot,
21            },
22        },
23        sql::identifier::{
24            identifier_last_segment, identifiers_tail_match, normalize_identifier_to_scope,
25            rewrite_field_identifiers,
26        },
27        sql::parser::{
28            SqlAggregateCall, SqlAggregateKind, SqlDeleteStatement, SqlExplainMode,
29            SqlExplainStatement, SqlExplainTarget, SqlHavingClause, SqlHavingSymbol,
30            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
31            SqlStatement, SqlTextFunctionCall,
32        },
33    },
34    model::{entity::EntityModel, field::FieldKind},
35    traits::EntityKind,
36    value::Value,
37};
38use thiserror::Error as ThisError;
39
40///
41/// LoweredSqlCommand
42///
43/// Generic-free SQL command shape after reduced SQL parsing and entity-route
44/// normalization.
45/// This keeps statement-shape lowering shared across entities before typed
46/// `Query<E>` binding happens at the execution boundary.
47///
48#[derive(Clone, Debug)]
49pub struct LoweredSqlCommand(LoweredSqlCommandInner);
50
51#[derive(Clone, Debug)]
52enum LoweredSqlCommandInner {
53    Query(LoweredSqlQuery),
54    Explain {
55        mode: SqlExplainMode,
56        query: LoweredSqlQuery,
57    },
58    ExplainGlobalAggregate {
59        mode: SqlExplainMode,
60        command: LoweredSqlGlobalAggregateCommand,
61    },
62    DescribeEntity,
63    ShowIndexesEntity,
64    ShowColumnsEntity,
65    ShowEntities,
66}
67
68///
69/// SqlCommand
70///
71/// Test-only typed SQL command shell over the shared lowered SQL surface.
72/// Runtime dispatch now consumes `LoweredSqlCommand` directly, but lowering
73/// tests still validate typed binding behavior on this local envelope.
74///
75#[cfg(test)]
76#[derive(Debug)]
77pub(crate) enum SqlCommand<E: EntityKind> {
78    Query(Query<E>),
79    Explain {
80        mode: SqlExplainMode,
81        query: Query<E>,
82    },
83    ExplainGlobalAggregate {
84        mode: SqlExplainMode,
85        command: SqlGlobalAggregateCommand<E>,
86    },
87    DescribeEntity,
88    ShowIndexesEntity,
89    ShowColumnsEntity,
90    ShowEntities,
91}
92
93impl LoweredSqlCommand {
94    #[must_use]
95    pub(in crate::db) const fn query(&self) -> Option<&LoweredSqlQuery> {
96        match &self.0 {
97            LoweredSqlCommandInner::Query(query) => Some(query),
98            LoweredSqlCommandInner::Explain { .. }
99            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
100            | LoweredSqlCommandInner::DescribeEntity
101            | LoweredSqlCommandInner::ShowIndexesEntity
102            | LoweredSqlCommandInner::ShowColumnsEntity
103            | LoweredSqlCommandInner::ShowEntities => None,
104        }
105    }
106
107    #[must_use]
108    pub(in crate::db) const fn explain_query(&self) -> Option<(SqlExplainMode, &LoweredSqlQuery)> {
109        match &self.0 {
110            LoweredSqlCommandInner::Explain { mode, query } => Some((*mode, query)),
111            LoweredSqlCommandInner::Query(_)
112            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
113            | LoweredSqlCommandInner::DescribeEntity
114            | LoweredSqlCommandInner::ShowIndexesEntity
115            | LoweredSqlCommandInner::ShowColumnsEntity
116            | LoweredSqlCommandInner::ShowEntities => None,
117        }
118    }
119}
120
121///
122/// LoweredSqlQuery
123///
124/// Generic-free executable SQL query shape prepared before typed query binding.
125/// Select and delete lowering stay shared until the final `Query<E>` build.
126///
127#[derive(Clone, Debug)]
128pub(crate) enum LoweredSqlQuery {
129    Select(LoweredSelectShape),
130    Delete(LoweredBaseQueryShape),
131}
132
133impl LoweredSqlQuery {
134    // Report whether this lowered query carries grouped execution semantics.
135    pub(crate) const fn has_grouping(&self) -> bool {
136        match self {
137            Self::Select(select) => select.has_grouping(),
138            Self::Delete(_) => false,
139        }
140    }
141}
142
143///
144/// SqlGlobalAggregateTerminal
145///
146/// Global SQL aggregate terminals currently executable through dedicated
147/// aggregate SQL entrypoints.
148///
149
150#[derive(Clone, Debug, Eq, PartialEq)]
151pub(crate) enum SqlGlobalAggregateTerminal {
152    CountRows,
153    CountField(String),
154    SumField(String),
155    AvgField(String),
156    MinField(String),
157    MaxField(String),
158}
159
160///
161/// TypedSqlGlobalAggregateTerminal
162///
163/// TypedSqlGlobalAggregateTerminal is the typed global aggregate contract used
164/// after entity binding resolves one concrete model.
165/// Field-target variants carry a resolved planner field slot so typed SQL
166/// aggregate execution does not re-resolve the same field name before dispatch.
167///
168#[derive(Clone, Debug, Eq, PartialEq)]
169pub(crate) enum TypedSqlGlobalAggregateTerminal {
170    CountRows,
171    CountField(FieldSlot),
172    SumField(FieldSlot),
173    AvgField(FieldSlot),
174    MinField(FieldSlot),
175    MaxField(FieldSlot),
176}
177
178/// PreparedSqlScalarAggregateDomain
179///
180/// Typed SQL scalar aggregate execution domain selected before session runtime
181/// dispatch. This keeps the typed aggregate lane explicit about which internal
182/// execution family will consume the request.
183#[derive(Clone, Copy, Debug, Eq, PartialEq)]
184pub(crate) enum PreparedSqlScalarAggregateDomain {
185    ExistingRows,
186    ProjectionField,
187    NumericField,
188    ScalarExtremaValue,
189}
190
191/// PreparedSqlScalarAggregateOrderingRequirement
192///
193/// Ordering sensitivity required by the selected typed SQL scalar aggregate
194/// strategy. This keeps first-slice descriptor/explain consumers off local
195/// kind checks when they need to know whether field order semantics matter.
196#[derive(Clone, Copy, Debug, Eq, PartialEq)]
197pub(crate) enum PreparedSqlScalarAggregateOrderingRequirement {
198    None,
199    FieldOrder,
200}
201
202/// PreparedSqlScalarAggregateRowSource
203///
204/// Canonical row-source shape for one prepared typed SQL scalar aggregate
205/// strategy. This describes what kind of row-derived data the execution family
206/// ultimately consumes.
207#[derive(Clone, Copy, Debug, Eq, PartialEq)]
208pub(crate) enum PreparedSqlScalarAggregateRowSource {
209    ExistingRows,
210    ProjectedField,
211    NumericField,
212    ExtremalWinnerField,
213}
214
215/// PreparedSqlScalarAggregateEmptySetBehavior
216///
217/// Canonical empty-window result behavior for one prepared typed SQL scalar
218/// aggregate strategy.
219#[derive(Clone, Copy, Debug, Eq, PartialEq)]
220pub(crate) enum PreparedSqlScalarAggregateEmptySetBehavior {
221    Zero,
222    Null,
223}
224
225/// PreparedSqlScalarAggregateDescriptorShape
226///
227/// Stable typed SQL scalar aggregate descriptor shape derived once at the SQL
228/// aggregate preparation boundary and reused by runtime/EXPLAIN projections.
229#[derive(Clone, Copy, Debug, Eq, PartialEq)]
230pub(crate) enum PreparedSqlScalarAggregateDescriptorShape {
231    CountRows,
232    CountField,
233    SumField,
234    AvgField,
235    MinField,
236    MaxField,
237}
238
239/// PreparedSqlScalarAggregateRuntimeDescriptor
240///
241/// Stable runtime-family projection for one prepared typed SQL scalar
242/// aggregate strategy.
243/// Session SQL aggregate execution consumes this descriptor instead of
244/// rebuilding runtime boundary choice from raw SQL terminal variants or
245/// parallel metadata tuple matches.
246#[derive(Clone, Copy, Debug, Eq, PartialEq)]
247pub(crate) enum PreparedSqlScalarAggregateRuntimeDescriptor {
248    CountRows,
249    CountField,
250    NumericField { kind: AggregateKind },
251    ExtremalWinnerField { kind: AggregateKind },
252}
253
254///
255/// PreparedSqlScalarAggregateStrategy
256///
257/// PreparedSqlScalarAggregateStrategy is the single typed SQL scalar aggregate
258/// behavior source for the first `0.71` slice.
259/// It resolves aggregate domain, descriptor shape, target-slot ownership, and
260/// runtime behavior once so runtime and EXPLAIN do not re-derive that
261/// behavior from raw SQL terminal variants.
262/// Explain-visible aggregate expressions are projected on demand from this
263/// prepared strategy instead of being carried as owned execution metadata.
264///
265#[derive(Clone, Debug, Eq, PartialEq)]
266pub(crate) struct PreparedSqlScalarAggregateStrategy {
267    target_slot: Option<FieldSlot>,
268    domain: PreparedSqlScalarAggregateDomain,
269    ordering_requirement: PreparedSqlScalarAggregateOrderingRequirement,
270    row_source: PreparedSqlScalarAggregateRowSource,
271    empty_set_behavior: PreparedSqlScalarAggregateEmptySetBehavior,
272    descriptor_shape: PreparedSqlScalarAggregateDescriptorShape,
273}
274
275impl PreparedSqlScalarAggregateStrategy {
276    const fn new(
277        target_slot: Option<FieldSlot>,
278        domain: PreparedSqlScalarAggregateDomain,
279        ordering_requirement: PreparedSqlScalarAggregateOrderingRequirement,
280        row_source: PreparedSqlScalarAggregateRowSource,
281        empty_set_behavior: PreparedSqlScalarAggregateEmptySetBehavior,
282        descriptor_shape: PreparedSqlScalarAggregateDescriptorShape,
283    ) -> Self {
284        Self {
285            target_slot,
286            domain,
287            ordering_requirement,
288            row_source,
289            empty_set_behavior,
290            descriptor_shape,
291        }
292    }
293
294    fn from_typed_terminal(terminal: &TypedSqlGlobalAggregateTerminal) -> Self {
295        match terminal {
296            TypedSqlGlobalAggregateTerminal::CountRows => Self::new(
297                None,
298                PreparedSqlScalarAggregateDomain::ExistingRows,
299                PreparedSqlScalarAggregateOrderingRequirement::None,
300                PreparedSqlScalarAggregateRowSource::ExistingRows,
301                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
302                PreparedSqlScalarAggregateDescriptorShape::CountRows,
303            ),
304            TypedSqlGlobalAggregateTerminal::CountField(target_slot) => Self::new(
305                Some(target_slot.clone()),
306                PreparedSqlScalarAggregateDomain::ProjectionField,
307                PreparedSqlScalarAggregateOrderingRequirement::None,
308                PreparedSqlScalarAggregateRowSource::ProjectedField,
309                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
310                PreparedSqlScalarAggregateDescriptorShape::CountField,
311            ),
312            TypedSqlGlobalAggregateTerminal::SumField(target_slot) => Self::new(
313                Some(target_slot.clone()),
314                PreparedSqlScalarAggregateDomain::NumericField,
315                PreparedSqlScalarAggregateOrderingRequirement::None,
316                PreparedSqlScalarAggregateRowSource::NumericField,
317                PreparedSqlScalarAggregateEmptySetBehavior::Null,
318                PreparedSqlScalarAggregateDescriptorShape::SumField,
319            ),
320            TypedSqlGlobalAggregateTerminal::AvgField(target_slot) => Self::new(
321                Some(target_slot.clone()),
322                PreparedSqlScalarAggregateDomain::NumericField,
323                PreparedSqlScalarAggregateOrderingRequirement::None,
324                PreparedSqlScalarAggregateRowSource::NumericField,
325                PreparedSqlScalarAggregateEmptySetBehavior::Null,
326                PreparedSqlScalarAggregateDescriptorShape::AvgField,
327            ),
328            TypedSqlGlobalAggregateTerminal::MinField(target_slot) => Self::new(
329                Some(target_slot.clone()),
330                PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
331                PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
332                PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
333                PreparedSqlScalarAggregateEmptySetBehavior::Null,
334                PreparedSqlScalarAggregateDescriptorShape::MinField,
335            ),
336            TypedSqlGlobalAggregateTerminal::MaxField(target_slot) => Self::new(
337                Some(target_slot.clone()),
338                PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
339                PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
340                PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
341                PreparedSqlScalarAggregateEmptySetBehavior::Null,
342                PreparedSqlScalarAggregateDescriptorShape::MaxField,
343            ),
344        }
345    }
346
347    fn from_lowered_terminal_with_model(
348        model: &'static EntityModel,
349        terminal: &SqlGlobalAggregateTerminal,
350    ) -> Result<Self, SqlLoweringError> {
351        let resolve_target_slot = |field: &str| {
352            resolve_aggregate_target_field_slot(model, field).map_err(SqlLoweringError::from)
353        };
354
355        match terminal {
356            SqlGlobalAggregateTerminal::CountRows => Ok(Self::new(
357                None,
358                PreparedSqlScalarAggregateDomain::ExistingRows,
359                PreparedSqlScalarAggregateOrderingRequirement::None,
360                PreparedSqlScalarAggregateRowSource::ExistingRows,
361                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
362                PreparedSqlScalarAggregateDescriptorShape::CountRows,
363            )),
364            SqlGlobalAggregateTerminal::CountField(field) => {
365                let target_slot = resolve_target_slot(field.as_str())?;
366
367                Ok(Self::new(
368                    Some(target_slot),
369                    PreparedSqlScalarAggregateDomain::ProjectionField,
370                    PreparedSqlScalarAggregateOrderingRequirement::None,
371                    PreparedSqlScalarAggregateRowSource::ProjectedField,
372                    PreparedSqlScalarAggregateEmptySetBehavior::Zero,
373                    PreparedSqlScalarAggregateDescriptorShape::CountField,
374                ))
375            }
376            SqlGlobalAggregateTerminal::SumField(field) => {
377                let target_slot = resolve_target_slot(field.as_str())?;
378
379                Ok(Self::new(
380                    Some(target_slot),
381                    PreparedSqlScalarAggregateDomain::NumericField,
382                    PreparedSqlScalarAggregateOrderingRequirement::None,
383                    PreparedSqlScalarAggregateRowSource::NumericField,
384                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
385                    PreparedSqlScalarAggregateDescriptorShape::SumField,
386                ))
387            }
388            SqlGlobalAggregateTerminal::AvgField(field) => {
389                let target_slot = resolve_target_slot(field.as_str())?;
390
391                Ok(Self::new(
392                    Some(target_slot),
393                    PreparedSqlScalarAggregateDomain::NumericField,
394                    PreparedSqlScalarAggregateOrderingRequirement::None,
395                    PreparedSqlScalarAggregateRowSource::NumericField,
396                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
397                    PreparedSqlScalarAggregateDescriptorShape::AvgField,
398                ))
399            }
400            SqlGlobalAggregateTerminal::MinField(field) => {
401                let target_slot = resolve_target_slot(field.as_str())?;
402
403                Ok(Self::new(
404                    Some(target_slot),
405                    PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
406                    PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
407                    PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
408                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
409                    PreparedSqlScalarAggregateDescriptorShape::MinField,
410                ))
411            }
412            SqlGlobalAggregateTerminal::MaxField(field) => {
413                let target_slot = resolve_target_slot(field.as_str())?;
414
415                Ok(Self::new(
416                    Some(target_slot),
417                    PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
418                    PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
419                    PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
420                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
421                    PreparedSqlScalarAggregateDescriptorShape::MaxField,
422                ))
423            }
424        }
425    }
426
427    /// Borrow the resolved target slot when this prepared SQL scalar strategy is field-targeted.
428    #[must_use]
429    pub(crate) const fn target_slot(&self) -> Option<&FieldSlot> {
430        self.target_slot.as_ref()
431    }
432
433    /// Return the canonical typed SQL scalar aggregate domain.
434    #[cfg(test)]
435    #[must_use]
436    pub(crate) const fn domain(&self) -> PreparedSqlScalarAggregateDomain {
437        self.domain
438    }
439
440    /// Return the stable descriptor/runtime shape label for this prepared strategy.
441    #[cfg(test)]
442    #[must_use]
443    pub(crate) const fn descriptor_shape(&self) -> PreparedSqlScalarAggregateDescriptorShape {
444        self.descriptor_shape
445    }
446
447    /// Return the stable runtime-family projection for this prepared SQL
448    /// scalar aggregate strategy.
449    #[must_use]
450    pub(crate) const fn runtime_descriptor(&self) -> PreparedSqlScalarAggregateRuntimeDescriptor {
451        match self.descriptor_shape {
452            PreparedSqlScalarAggregateDescriptorShape::CountRows => {
453                PreparedSqlScalarAggregateRuntimeDescriptor::CountRows
454            }
455            PreparedSqlScalarAggregateDescriptorShape::CountField => {
456                PreparedSqlScalarAggregateRuntimeDescriptor::CountField
457            }
458            PreparedSqlScalarAggregateDescriptorShape::SumField => {
459                PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
460                    kind: AggregateKind::Sum,
461                }
462            }
463            PreparedSqlScalarAggregateDescriptorShape::AvgField => {
464                PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
465                    kind: AggregateKind::Avg,
466                }
467            }
468            PreparedSqlScalarAggregateDescriptorShape::MinField => {
469                PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
470                    kind: AggregateKind::Min,
471                }
472            }
473            PreparedSqlScalarAggregateDescriptorShape::MaxField => {
474                PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
475                    kind: AggregateKind::Max,
476                }
477            }
478        }
479    }
480
481    /// Return the canonical aggregate kind for this prepared SQL scalar strategy.
482    #[must_use]
483    pub(crate) const fn aggregate_kind(&self) -> AggregateKind {
484        match self.descriptor_shape {
485            PreparedSqlScalarAggregateDescriptorShape::CountRows
486            | PreparedSqlScalarAggregateDescriptorShape::CountField => AggregateKind::Count,
487            PreparedSqlScalarAggregateDescriptorShape::SumField => AggregateKind::Sum,
488            PreparedSqlScalarAggregateDescriptorShape::AvgField => AggregateKind::Avg,
489            PreparedSqlScalarAggregateDescriptorShape::MinField => AggregateKind::Min,
490            PreparedSqlScalarAggregateDescriptorShape::MaxField => AggregateKind::Max,
491        }
492    }
493
494    /// Return the projected field label for descriptor/explain projection when
495    /// this prepared strategy is field-targeted.
496    #[must_use]
497    pub(crate) fn projected_field(&self) -> Option<&str> {
498        self.target_slot().map(FieldSlot::field)
499    }
500
501    /// Return field-order sensitivity for this prepared SQL scalar aggregate strategy.
502    #[cfg(test)]
503    #[must_use]
504    pub(crate) const fn ordering_requirement(
505        &self,
506    ) -> PreparedSqlScalarAggregateOrderingRequirement {
507        self.ordering_requirement
508    }
509
510    /// Return the canonical row-source shape for this prepared strategy.
511    #[cfg(test)]
512    #[must_use]
513    pub(crate) const fn row_source(&self) -> PreparedSqlScalarAggregateRowSource {
514        self.row_source
515    }
516
517    /// Return empty-window behavior for this prepared SQL scalar aggregate strategy.
518    #[cfg(test)]
519    #[must_use]
520    pub(crate) const fn empty_set_behavior(&self) -> PreparedSqlScalarAggregateEmptySetBehavior {
521        self.empty_set_behavior
522    }
523}
524
525///
526/// LoweredSqlGlobalAggregateCommand
527///
528/// Generic-free global aggregate command shape prepared before typed query
529/// binding.
530/// This keeps aggregate SQL lowering shared across entities until the final
531/// execution boundary converts the base query shape into `Query<E>`.
532///
533#[derive(Clone, Debug)]
534pub(crate) struct LoweredSqlGlobalAggregateCommand {
535    query: LoweredBaseQueryShape,
536    terminal: SqlGlobalAggregateTerminal,
537}
538
539///
540/// LoweredSqlAggregateShape
541///
542/// Locally validated aggregate-call shape used by SQL lowering to avoid
543/// duplicating `(SqlAggregateKind, field)` validation across lowering lanes.
544///
545
546enum LoweredSqlAggregateShape {
547    CountRows,
548    CountField(String),
549    FieldTarget {
550        kind: SqlAggregateKind,
551        field: String,
552    },
553}
554
555///
556/// SqlGlobalAggregateCommand
557///
558/// Lowered global SQL aggregate command carrying base query shape plus terminal.
559///
560
561#[derive(Debug)]
562pub(crate) struct SqlGlobalAggregateCommand<E: EntityKind> {
563    query: Query<E>,
564    terminal: TypedSqlGlobalAggregateTerminal,
565}
566
567impl<E: EntityKind> SqlGlobalAggregateCommand<E> {
568    /// Borrow the lowered base query shape for aggregate execution.
569    #[must_use]
570    pub(crate) const fn query(&self) -> &Query<E> {
571        &self.query
572    }
573
574    /// Borrow the lowered aggregate terminal.
575    #[cfg(test)]
576    #[must_use]
577    pub(crate) const fn terminal(&self) -> &TypedSqlGlobalAggregateTerminal {
578        &self.terminal
579    }
580
581    /// Prepare one typed SQL scalar aggregate strategy from the typed command boundary.
582    #[must_use]
583    pub(crate) fn prepared_scalar_strategy(&self) -> PreparedSqlScalarAggregateStrategy {
584        PreparedSqlScalarAggregateStrategy::from_typed_terminal(&self.terminal)
585    }
586}
587
588///
589/// SqlGlobalAggregateCommandCore
590///
591/// Generic-free lowered global aggregate command bound onto the structural
592/// query surface.
593/// This keeps global aggregate EXPLAIN on the shared query/explain path until
594/// a typed boundary is strictly required.
595///
596
597#[derive(Debug)]
598pub(crate) struct SqlGlobalAggregateCommandCore {
599    query: StructuralQuery,
600    terminal: SqlGlobalAggregateTerminal,
601}
602
603impl SqlGlobalAggregateCommandCore {
604    /// Borrow the structural query payload for aggregate explain/execution.
605    #[must_use]
606    pub(in crate::db) const fn query(&self) -> &StructuralQuery {
607        &self.query
608    }
609
610    /// Prepare one structural SQL scalar aggregate strategy using one concrete model.
611    pub(in crate::db) fn prepared_scalar_strategy_with_model(
612        &self,
613        model: &'static EntityModel,
614    ) -> Result<PreparedSqlScalarAggregateStrategy, SqlLoweringError> {
615        PreparedSqlScalarAggregateStrategy::from_lowered_terminal_with_model(model, &self.terminal)
616    }
617}
618
619///
620/// SqlLoweringError
621///
622/// SQL frontend lowering failures before planner validation/execution.
623///
624
625#[derive(Debug, ThisError)]
626pub(crate) enum SqlLoweringError {
627    #[error("{0}")]
628    Parse(#[from] crate::db::sql::parser::SqlParseError),
629
630    #[error("{0}")]
631    Query(#[from] QueryError),
632
633    #[error("SQL entity '{sql_entity}' does not match requested entity type '{expected_entity}'")]
634    EntityMismatch {
635        sql_entity: String,
636        expected_entity: &'static str,
637    },
638
639    #[error(
640        "unsupported SQL SELECT projection; supported forms are SELECT *, field lists, or grouped aggregate shapes"
641    )]
642    UnsupportedSelectProjection,
643
644    #[error("unsupported SQL SELECT DISTINCT")]
645    UnsupportedSelectDistinct,
646
647    #[error("unsupported SQL GROUP BY projection shape")]
648    UnsupportedSelectGroupBy,
649
650    #[error("unsupported SQL HAVING shape")]
651    UnsupportedSelectHaving,
652}
653
654impl SqlLoweringError {
655    /// Construct one entity-mismatch SQL lowering error.
656    fn entity_mismatch(sql_entity: impl Into<String>, expected_entity: &'static str) -> Self {
657        Self::EntityMismatch {
658            sql_entity: sql_entity.into(),
659            expected_entity,
660        }
661    }
662
663    /// Construct one unsupported SELECT projection SQL lowering error.
664    const fn unsupported_select_projection() -> Self {
665        Self::UnsupportedSelectProjection
666    }
667
668    /// Construct one unsupported SELECT DISTINCT SQL lowering error.
669    const fn unsupported_select_distinct() -> Self {
670        Self::UnsupportedSelectDistinct
671    }
672
673    /// Construct one unsupported SELECT GROUP BY shape SQL lowering error.
674    const fn unsupported_select_group_by() -> Self {
675        Self::UnsupportedSelectGroupBy
676    }
677
678    /// Construct one unsupported SELECT HAVING shape SQL lowering error.
679    const fn unsupported_select_having() -> Self {
680        Self::UnsupportedSelectHaving
681    }
682}
683
684///
685/// PreparedSqlStatement
686///
687/// SQL statement envelope after entity-scope normalization and
688/// entity-match validation for one target entity descriptor.
689///
690/// This pre-lowering contract is entity-agnostic and reusable across
691/// dynamic SQL route branches before typed `Query<E>` binding.
692///
693
694#[derive(Clone, Debug)]
695pub(crate) struct PreparedSqlStatement {
696    statement: SqlStatement,
697}
698
699#[derive(Clone, Copy, Debug, Eq, PartialEq)]
700pub(crate) enum LoweredSqlLaneKind {
701    Query,
702    Explain,
703    Describe,
704    ShowIndexes,
705    ShowColumns,
706    ShowEntities,
707}
708
709/// Parse and lower one SQL statement into canonical query intent for `E`.
710#[cfg(test)]
711pub(crate) fn compile_sql_command<E: EntityKind>(
712    sql: &str,
713    consistency: MissingRowPolicy,
714) -> Result<SqlCommand<E>, SqlLoweringError> {
715    let statement = crate::db::sql::parser::parse_sql(sql)?;
716    compile_sql_command_from_statement::<E>(statement, consistency)
717}
718
719/// Lower one parsed SQL statement into canonical query intent for `E`.
720#[cfg(test)]
721pub(crate) fn compile_sql_command_from_statement<E: EntityKind>(
722    statement: SqlStatement,
723    consistency: MissingRowPolicy,
724) -> Result<SqlCommand<E>, SqlLoweringError> {
725    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
726    compile_sql_command_from_prepared_statement::<E>(prepared, consistency)
727}
728
729/// Lower one prepared SQL statement into canonical query intent for `E`.
730#[cfg(test)]
731pub(crate) fn compile_sql_command_from_prepared_statement<E: EntityKind>(
732    prepared: PreparedSqlStatement,
733    consistency: MissingRowPolicy,
734) -> Result<SqlCommand<E>, SqlLoweringError> {
735    let lowered = lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)?;
736
737    bind_lowered_sql_command::<E>(lowered, consistency)
738}
739
740/// Lower one prepared SQL statement into one shared generic-free command shape.
741#[inline(never)]
742pub(crate) fn lower_sql_command_from_prepared_statement(
743    prepared: PreparedSqlStatement,
744    primary_key_field: &str,
745) -> Result<LoweredSqlCommand, SqlLoweringError> {
746    lower_prepared_statement(prepared.statement, primary_key_field)
747}
748
749pub(crate) const fn lowered_sql_command_lane(command: &LoweredSqlCommand) -> LoweredSqlLaneKind {
750    match command.0 {
751        LoweredSqlCommandInner::Query(_) => LoweredSqlLaneKind::Query,
752        LoweredSqlCommandInner::Explain { .. }
753        | LoweredSqlCommandInner::ExplainGlobalAggregate { .. } => LoweredSqlLaneKind::Explain,
754        LoweredSqlCommandInner::DescribeEntity => LoweredSqlLaneKind::Describe,
755        LoweredSqlCommandInner::ShowIndexesEntity => LoweredSqlLaneKind::ShowIndexes,
756        LoweredSqlCommandInner::ShowColumnsEntity => LoweredSqlLaneKind::ShowColumns,
757        LoweredSqlCommandInner::ShowEntities => LoweredSqlLaneKind::ShowEntities,
758    }
759}
760
761/// Return whether one parsed SQL statement is an executable constrained global
762/// aggregate shape owned by the dedicated aggregate lane.
763pub(in crate::db) fn is_sql_global_aggregate_statement(statement: &SqlStatement) -> bool {
764    let SqlStatement::Select(statement) = statement else {
765        return false;
766    };
767
768    is_sql_global_aggregate_select(statement)
769}
770
771// Detect one constrained global aggregate select shape without widening any
772// non-aggregate SQL surface onto the dedicated aggregate execution lane.
773fn is_sql_global_aggregate_select(statement: &SqlSelectStatement) -> bool {
774    if statement.distinct || !statement.group_by.is_empty() || !statement.having.is_empty() {
775        return false;
776    }
777
778    lower_global_aggregate_terminal(statement.projection.clone()).is_ok()
779}
780
781/// Bind one lowered global aggregate EXPLAIN shape onto the structural query
782/// surface when the explain command carries that specialized form.
783pub(crate) fn bind_lowered_sql_explain_global_aggregate_structural(
784    lowered: &LoweredSqlCommand,
785    model: &'static crate::model::entity::EntityModel,
786    consistency: MissingRowPolicy,
787) -> Option<(SqlExplainMode, SqlGlobalAggregateCommandCore)> {
788    let LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } = &lowered.0 else {
789        return None;
790    };
791
792    Some((
793        *mode,
794        bind_lowered_sql_global_aggregate_command_structural(model, command.clone(), consistency),
795    ))
796}
797
798/// Bind one shared generic-free SQL command shape to the typed query surface.
799#[cfg(test)]
800pub(crate) fn bind_lowered_sql_command<E: EntityKind>(
801    lowered: LoweredSqlCommand,
802    consistency: MissingRowPolicy,
803) -> Result<SqlCommand<E>, SqlLoweringError> {
804    match lowered.0 {
805        LoweredSqlCommandInner::Query(query) => Ok(SqlCommand::Query(bind_lowered_sql_query::<E>(
806            query,
807            consistency,
808        )?)),
809        LoweredSqlCommandInner::Explain { mode, query } => Ok(SqlCommand::Explain {
810            mode,
811            query: bind_lowered_sql_query::<E>(query, consistency)?,
812        }),
813        LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } => {
814            Ok(SqlCommand::ExplainGlobalAggregate {
815                mode,
816                command: bind_lowered_sql_global_aggregate_command::<E>(command, consistency)?,
817            })
818        }
819        LoweredSqlCommandInner::DescribeEntity => Ok(SqlCommand::DescribeEntity),
820        LoweredSqlCommandInner::ShowIndexesEntity => Ok(SqlCommand::ShowIndexesEntity),
821        LoweredSqlCommandInner::ShowColumnsEntity => Ok(SqlCommand::ShowColumnsEntity),
822        LoweredSqlCommandInner::ShowEntities => Ok(SqlCommand::ShowEntities),
823    }
824}
825
826/// Prepare one parsed SQL statement for one expected entity route.
827#[inline(never)]
828pub(crate) fn prepare_sql_statement(
829    statement: SqlStatement,
830    expected_entity: &'static str,
831) -> Result<PreparedSqlStatement, SqlLoweringError> {
832    let statement = prepare_statement(statement, expected_entity)?;
833
834    Ok(PreparedSqlStatement { statement })
835}
836
837/// Parse and lower one SQL statement into global aggregate execution command for `E`.
838#[cfg(test)]
839pub(crate) fn compile_sql_global_aggregate_command<E: EntityKind>(
840    sql: &str,
841    consistency: MissingRowPolicy,
842) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
843    let statement = crate::db::sql::parser::parse_sql(sql)?;
844    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
845    compile_sql_global_aggregate_command_from_prepared::<E>(prepared, consistency)
846}
847
848// Lower one already-prepared SQL statement into the constrained global
849// aggregate command envelope so callers that already parsed and routed the
850// statement do not pay the parser again.
851pub(crate) fn compile_sql_global_aggregate_command_from_prepared<E: EntityKind>(
852    prepared: PreparedSqlStatement,
853    consistency: MissingRowPolicy,
854) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
855    let SqlStatement::Select(statement) = prepared.statement else {
856        return Err(SqlLoweringError::unsupported_select_projection());
857    };
858
859    bind_lowered_sql_global_aggregate_command::<E>(
860        lower_global_aggregate_select_shape(statement)?,
861        consistency,
862    )
863}
864
865fn bind_lowered_sql_global_aggregate_terminal<E: EntityKind>(
866    terminal: SqlGlobalAggregateTerminal,
867) -> Result<TypedSqlGlobalAggregateTerminal, SqlLoweringError> {
868    let resolve_target_slot = |field: &str| {
869        resolve_aggregate_target_field_slot(E::MODEL, field).map_err(SqlLoweringError::from)
870    };
871
872    match terminal {
873        SqlGlobalAggregateTerminal::CountRows => Ok(TypedSqlGlobalAggregateTerminal::CountRows),
874        SqlGlobalAggregateTerminal::CountField(field) => Ok(
875            TypedSqlGlobalAggregateTerminal::CountField(resolve_target_slot(field.as_str())?),
876        ),
877        SqlGlobalAggregateTerminal::SumField(field) => Ok(
878            TypedSqlGlobalAggregateTerminal::SumField(resolve_target_slot(field.as_str())?),
879        ),
880        SqlGlobalAggregateTerminal::AvgField(field) => Ok(
881            TypedSqlGlobalAggregateTerminal::AvgField(resolve_target_slot(field.as_str())?),
882        ),
883        SqlGlobalAggregateTerminal::MinField(field) => Ok(
884            TypedSqlGlobalAggregateTerminal::MinField(resolve_target_slot(field.as_str())?),
885        ),
886        SqlGlobalAggregateTerminal::MaxField(field) => Ok(
887            TypedSqlGlobalAggregateTerminal::MaxField(resolve_target_slot(field.as_str())?),
888        ),
889    }
890}
891
892#[inline(never)]
893fn prepare_statement(
894    statement: SqlStatement,
895    expected_entity: &'static str,
896) -> Result<SqlStatement, SqlLoweringError> {
897    match statement {
898        SqlStatement::Select(statement) => Ok(SqlStatement::Select(prepare_select_statement(
899            statement,
900            expected_entity,
901        )?)),
902        SqlStatement::Delete(statement) => Ok(SqlStatement::Delete(prepare_delete_statement(
903            statement,
904            expected_entity,
905        )?)),
906        SqlStatement::Explain(statement) => Ok(SqlStatement::Explain(prepare_explain_statement(
907            statement,
908            expected_entity,
909        )?)),
910        SqlStatement::Describe(statement) => {
911            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
912
913            Ok(SqlStatement::Describe(statement))
914        }
915        SqlStatement::ShowIndexes(statement) => {
916            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
917
918            Ok(SqlStatement::ShowIndexes(statement))
919        }
920        SqlStatement::ShowColumns(statement) => {
921            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
922
923            Ok(SqlStatement::ShowColumns(statement))
924        }
925        SqlStatement::ShowEntities(statement) => Ok(SqlStatement::ShowEntities(statement)),
926    }
927}
928
929fn prepare_explain_statement(
930    statement: SqlExplainStatement,
931    expected_entity: &'static str,
932) -> Result<SqlExplainStatement, SqlLoweringError> {
933    let target = match statement.statement {
934        SqlExplainTarget::Select(select_statement) => {
935            SqlExplainTarget::Select(prepare_select_statement(select_statement, expected_entity)?)
936        }
937        SqlExplainTarget::Delete(delete_statement) => {
938            SqlExplainTarget::Delete(prepare_delete_statement(delete_statement, expected_entity)?)
939        }
940    };
941
942    Ok(SqlExplainStatement {
943        mode: statement.mode,
944        statement: target,
945    })
946}
947
948fn prepare_select_statement(
949    statement: SqlSelectStatement,
950    expected_entity: &'static str,
951) -> Result<SqlSelectStatement, SqlLoweringError> {
952    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
953
954    Ok(normalize_select_statement_to_expected_entity(
955        statement,
956        expected_entity,
957    ))
958}
959
960fn normalize_select_statement_to_expected_entity(
961    mut statement: SqlSelectStatement,
962    expected_entity: &'static str,
963) -> SqlSelectStatement {
964    // Re-scope parsed identifiers onto the resolved entity surface after the
965    // caller has already established entity ownership for this statement.
966    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
967    statement.projection =
968        normalize_projection_identifiers(statement.projection, entity_scope.as_slice());
969    statement.group_by = normalize_identifier_list(statement.group_by, entity_scope.as_slice());
970    statement.predicate = statement
971        .predicate
972        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
973    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
974    statement.having = normalize_having_clauses(statement.having, entity_scope.as_slice());
975
976    statement
977}
978
979fn prepare_delete_statement(
980    mut statement: SqlDeleteStatement,
981    expected_entity: &'static str,
982) -> Result<SqlDeleteStatement, SqlLoweringError> {
983    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
984    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
985    statement.predicate = statement
986        .predicate
987        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
988    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
989
990    Ok(statement)
991}
992
993#[inline(never)]
994fn lower_prepared_statement(
995    statement: SqlStatement,
996    primary_key_field: &str,
997) -> Result<LoweredSqlCommand, SqlLoweringError> {
998    match statement {
999        SqlStatement::Select(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
1000            LoweredSqlQuery::Select(lower_select_shape(statement, primary_key_field)?),
1001        ))),
1002        SqlStatement::Delete(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
1003            LoweredSqlQuery::Delete(lower_delete_shape(statement)),
1004        ))),
1005        SqlStatement::Explain(statement) => lower_explain_prepared(statement, primary_key_field),
1006        SqlStatement::Describe(_) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::DescribeEntity)),
1007        SqlStatement::ShowIndexes(_) => {
1008            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowIndexesEntity))
1009        }
1010        SqlStatement::ShowColumns(_) => {
1011            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowColumnsEntity))
1012        }
1013        SqlStatement::ShowEntities(_) => {
1014            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowEntities))
1015        }
1016    }
1017}
1018
1019fn lower_explain_prepared(
1020    statement: SqlExplainStatement,
1021    primary_key_field: &str,
1022) -> Result<LoweredSqlCommand, SqlLoweringError> {
1023    let mode = statement.mode;
1024
1025    match statement.statement {
1026        SqlExplainTarget::Select(select_statement) => {
1027            lower_explain_select_prepared(select_statement, mode, primary_key_field)
1028        }
1029        SqlExplainTarget::Delete(delete_statement) => {
1030            Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
1031                mode,
1032                query: LoweredSqlQuery::Delete(lower_delete_shape(delete_statement)),
1033            }))
1034        }
1035    }
1036}
1037
1038fn lower_explain_select_prepared(
1039    statement: SqlSelectStatement,
1040    mode: SqlExplainMode,
1041    primary_key_field: &str,
1042) -> Result<LoweredSqlCommand, SqlLoweringError> {
1043    match lower_select_shape(statement.clone(), primary_key_field) {
1044        Ok(query) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
1045            mode,
1046            query: LoweredSqlQuery::Select(query),
1047        })),
1048        Err(SqlLoweringError::UnsupportedSelectProjection) => {
1049            let command = lower_global_aggregate_select_shape(statement)?;
1050
1051            Ok(LoweredSqlCommand(
1052                LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command },
1053            ))
1054        }
1055        Err(err) => Err(err),
1056    }
1057}
1058
1059fn lower_global_aggregate_select_shape(
1060    statement: SqlSelectStatement,
1061) -> Result<LoweredSqlGlobalAggregateCommand, SqlLoweringError> {
1062    let SqlSelectStatement {
1063        projection,
1064        predicate,
1065        distinct,
1066        group_by,
1067        having,
1068        order_by,
1069        limit,
1070        offset,
1071        entity: _,
1072    } = statement;
1073
1074    if distinct {
1075        return Err(SqlLoweringError::unsupported_select_distinct());
1076    }
1077    if !group_by.is_empty() {
1078        return Err(SqlLoweringError::unsupported_select_group_by());
1079    }
1080    if !having.is_empty() {
1081        return Err(SqlLoweringError::unsupported_select_having());
1082    }
1083
1084    let terminal = lower_global_aggregate_terminal(projection)?;
1085
1086    Ok(LoweredSqlGlobalAggregateCommand {
1087        query: LoweredBaseQueryShape {
1088            predicate,
1089            order_by,
1090            limit,
1091            offset,
1092        },
1093        terminal,
1094    })
1095}
1096
1097///
1098/// ResolvedHavingClause
1099///
1100/// Pre-resolved HAVING clause shape after SQL projection aggregate index
1101/// resolution. This keeps SQL shape analysis entity-agnostic before typed
1102/// query binding.
1103///
1104#[derive(Clone, Debug)]
1105enum ResolvedHavingClause {
1106    GroupField {
1107        field: String,
1108        op: crate::db::predicate::CompareOp,
1109        value: crate::value::Value,
1110    },
1111    Aggregate {
1112        aggregate_index: usize,
1113        op: crate::db::predicate::CompareOp,
1114        value: crate::value::Value,
1115    },
1116}
1117
1118///
1119/// LoweredSelectShape
1120///
1121/// Entity-agnostic lowered SQL SELECT shape prepared for typed `Query<E>`
1122/// binding.
1123///
1124#[derive(Clone, Debug)]
1125pub(crate) struct LoweredSelectShape {
1126    scalar_projection_fields: Option<Vec<String>>,
1127    grouped_projection_aggregates: Vec<SqlAggregateCall>,
1128    group_by_fields: Vec<String>,
1129    distinct: bool,
1130    having: Vec<ResolvedHavingClause>,
1131    predicate: Option<Predicate>,
1132    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1133    limit: Option<u32>,
1134    offset: Option<u32>,
1135}
1136
1137impl LoweredSelectShape {
1138    // Report whether this lowered select shape carries grouped execution state.
1139    const fn has_grouping(&self) -> bool {
1140        !self.group_by_fields.is_empty()
1141    }
1142}
1143
1144///
1145/// LoweredBaseQueryShape
1146///
1147/// Generic-free filter/order/window query modifiers shared by delete and
1148/// global-aggregate SQL lowering.
1149/// This keeps common SQL query-shape lowering shared before typed query
1150/// binding.
1151///
1152#[derive(Clone, Debug)]
1153pub(crate) struct LoweredBaseQueryShape {
1154    predicate: Option<Predicate>,
1155    order_by: Vec<SqlOrderTerm>,
1156    limit: Option<u32>,
1157    offset: Option<u32>,
1158}
1159
1160#[inline(never)]
1161fn lower_select_shape(
1162    statement: SqlSelectStatement,
1163    primary_key_field: &str,
1164) -> Result<LoweredSelectShape, SqlLoweringError> {
1165    let SqlSelectStatement {
1166        projection,
1167        predicate,
1168        distinct,
1169        group_by,
1170        having,
1171        order_by,
1172        limit,
1173        offset,
1174        entity: _,
1175    } = statement;
1176    let projection_for_having = projection.clone();
1177
1178    // Phase 1: resolve scalar/grouped projection shape.
1179    let (scalar_projection_fields, grouped_projection_aggregates) = if group_by.is_empty() {
1180        let scalar_projection_fields =
1181            lower_scalar_projection_fields(projection, distinct, primary_key_field)?;
1182        (scalar_projection_fields, Vec::new())
1183    } else {
1184        if distinct {
1185            return Err(SqlLoweringError::unsupported_select_distinct());
1186        }
1187        let grouped_projection_aggregates =
1188            grouped_projection_aggregate_calls(&projection, group_by.as_slice())?;
1189        (None, grouped_projection_aggregates)
1190    };
1191
1192    // Phase 2: resolve HAVING symbols against grouped projection authority.
1193    let having = lower_having_clauses(
1194        having,
1195        &projection_for_having,
1196        group_by.as_slice(),
1197        grouped_projection_aggregates.as_slice(),
1198    )?;
1199
1200    Ok(LoweredSelectShape {
1201        scalar_projection_fields,
1202        grouped_projection_aggregates,
1203        group_by_fields: group_by,
1204        distinct,
1205        having,
1206        predicate,
1207        order_by,
1208        limit,
1209        offset,
1210    })
1211}
1212
1213fn lower_scalar_projection_fields(
1214    projection: SqlProjection,
1215    distinct: bool,
1216    primary_key_field: &str,
1217) -> Result<Option<Vec<String>>, SqlLoweringError> {
1218    let SqlProjection::Items(items) = projection else {
1219        if distinct {
1220            return Ok(None);
1221        }
1222
1223        return Ok(None);
1224    };
1225
1226    let has_aggregate = items
1227        .iter()
1228        .any(|item| matches!(item, SqlSelectItem::Aggregate(_)));
1229    if has_aggregate {
1230        return Err(SqlLoweringError::unsupported_select_projection());
1231    }
1232
1233    let fields = items
1234        .into_iter()
1235        .map(|item| match item {
1236            SqlSelectItem::Field(field) => Ok(field),
1237            SqlSelectItem::Aggregate(_) | SqlSelectItem::TextFunction(_) => {
1238                Err(SqlLoweringError::unsupported_select_projection())
1239            }
1240        })
1241        .collect::<Result<Vec<_>, _>>()?;
1242
1243    validate_scalar_distinct_projection(distinct, fields.as_slice(), primary_key_field)?;
1244
1245    Ok(Some(fields))
1246}
1247
1248fn validate_scalar_distinct_projection(
1249    distinct: bool,
1250    projection_fields: &[String],
1251    primary_key_field: &str,
1252) -> Result<(), SqlLoweringError> {
1253    if !distinct {
1254        return Ok(());
1255    }
1256
1257    if projection_fields.is_empty() {
1258        return Ok(());
1259    }
1260
1261    let has_primary_key_field = projection_fields
1262        .iter()
1263        .any(|field| field == primary_key_field);
1264    if !has_primary_key_field {
1265        return Err(SqlLoweringError::unsupported_select_distinct());
1266    }
1267
1268    Ok(())
1269}
1270
1271fn lower_having_clauses(
1272    having_clauses: Vec<SqlHavingClause>,
1273    projection: &SqlProjection,
1274    group_by_fields: &[String],
1275    grouped_projection_aggregates: &[SqlAggregateCall],
1276) -> Result<Vec<ResolvedHavingClause>, SqlLoweringError> {
1277    if having_clauses.is_empty() {
1278        return Ok(Vec::new());
1279    }
1280    if group_by_fields.is_empty() {
1281        return Err(SqlLoweringError::unsupported_select_having());
1282    }
1283
1284    let projection_aggregates = grouped_projection_aggregate_calls(projection, group_by_fields)
1285        .map_err(|_| SqlLoweringError::unsupported_select_having())?;
1286    if projection_aggregates.as_slice() != grouped_projection_aggregates {
1287        return Err(SqlLoweringError::unsupported_select_having());
1288    }
1289
1290    let mut lowered = Vec::with_capacity(having_clauses.len());
1291    for clause in having_clauses {
1292        match clause.symbol {
1293            SqlHavingSymbol::Field(field) => lowered.push(ResolvedHavingClause::GroupField {
1294                field,
1295                op: clause.op,
1296                value: clause.value,
1297            }),
1298            SqlHavingSymbol::Aggregate(aggregate) => {
1299                let aggregate_index =
1300                    resolve_having_aggregate_index(&aggregate, grouped_projection_aggregates)?;
1301                lowered.push(ResolvedHavingClause::Aggregate {
1302                    aggregate_index,
1303                    op: clause.op,
1304                    value: clause.value,
1305                });
1306            }
1307        }
1308    }
1309
1310    Ok(lowered)
1311}
1312
1313// Canonicalize strict numeric SQL predicate literals onto the resolved model
1314// field kind so unsigned-width fields keep strict/indexable semantics even
1315// though reduced SQL integer tokens parse through one generic numeric value
1316// variant first.
1317fn canonicalize_sql_predicate_for_model(
1318    model: &'static EntityModel,
1319    predicate: Predicate,
1320) -> Predicate {
1321    match predicate {
1322        Predicate::And(children) => Predicate::And(
1323            children
1324                .into_iter()
1325                .map(|child| canonicalize_sql_predicate_for_model(model, child))
1326                .collect(),
1327        ),
1328        Predicate::Or(children) => Predicate::Or(
1329            children
1330                .into_iter()
1331                .map(|child| canonicalize_sql_predicate_for_model(model, child))
1332                .collect(),
1333        ),
1334        Predicate::Not(inner) => Predicate::Not(Box::new(canonicalize_sql_predicate_for_model(
1335            model, *inner,
1336        ))),
1337        Predicate::Compare(mut cmp) => {
1338            canonicalize_sql_compare_for_model(model, &mut cmp);
1339            Predicate::Compare(cmp)
1340        }
1341        Predicate::True
1342        | Predicate::False
1343        | Predicate::IsNull { .. }
1344        | Predicate::IsNotNull { .. }
1345        | Predicate::IsMissing { .. }
1346        | Predicate::IsEmpty { .. }
1347        | Predicate::IsNotEmpty { .. }
1348        | Predicate::TextContains { .. }
1349        | Predicate::TextContainsCi { .. } => predicate,
1350    }
1351}
1352
1353// Resolve one lowered predicate field onto the runtime model kind that owns
1354// its strict literal compatibility rules.
1355fn model_field_kind(model: &'static EntityModel, field: &str) -> Option<FieldKind> {
1356    model
1357        .fields()
1358        .iter()
1359        .find(|candidate| candidate.name() == field)
1360        .map(crate::model::field::FieldModel::kind)
1361}
1362
1363// Keep SQL-only literal widening narrow:
1364// - only strict equality-style numeric predicates are eligible
1365// - ordering already uses `NumericWiden`
1366// - text and expression-wrapped predicates stay untouched
1367fn canonicalize_sql_compare_for_model(
1368    model: &'static EntityModel,
1369    cmp: &mut crate::db::predicate::ComparePredicate,
1370) {
1371    if cmp.coercion.id != CoercionId::Strict {
1372        return;
1373    }
1374
1375    let Some(field_kind) = model_field_kind(model, &cmp.field) else {
1376        return;
1377    };
1378
1379    match cmp.op {
1380        CompareOp::Eq | CompareOp::Ne => {
1381            if let Some(value) =
1382                canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &cmp.value)
1383            {
1384                cmp.value = value;
1385            }
1386        }
1387        CompareOp::In | CompareOp::NotIn => {
1388            let Value::List(items) = &cmp.value else {
1389                return;
1390            };
1391
1392            let items = items
1393                .iter()
1394                .map(|item| {
1395                    canonicalize_strict_sql_numeric_value_for_kind(&field_kind, item)
1396                        .unwrap_or_else(|| item.clone())
1397                })
1398                .collect();
1399            cmp.value = Value::List(items);
1400        }
1401        CompareOp::Lt
1402        | CompareOp::Lte
1403        | CompareOp::Gt
1404        | CompareOp::Gte
1405        | CompareOp::Contains
1406        | CompareOp::StartsWith
1407        | CompareOp::EndsWith => {}
1408    }
1409}
1410
1411// Convert one parsed SQL numeric literal into the exact runtime `Value` variant
1412// required by the field kind when that conversion is lossless and unambiguous.
1413// This preserves strict equality semantics while still letting SQL express
1414// unsigned-width comparisons such as `Nat16`/`u64` fields.
1415fn canonicalize_strict_sql_numeric_value_for_kind(
1416    kind: &FieldKind,
1417    value: &Value,
1418) -> Option<Value> {
1419    match kind {
1420        FieldKind::Relation { key_kind, .. } => {
1421            canonicalize_strict_sql_numeric_value_for_kind(key_kind, value)
1422        }
1423        FieldKind::Int => match value {
1424            Value::Int(inner) => Some(Value::Int(*inner)),
1425            Value::Uint(inner) => i64::try_from(*inner).ok().map(Value::Int),
1426            _ => None,
1427        },
1428        FieldKind::Uint => match value {
1429            Value::Int(inner) => u64::try_from(*inner).ok().map(Value::Uint),
1430            Value::Uint(inner) => Some(Value::Uint(*inner)),
1431            _ => None,
1432        },
1433        FieldKind::Account
1434        | FieldKind::Blob
1435        | FieldKind::Bool
1436        | FieldKind::Date
1437        | FieldKind::Decimal { .. }
1438        | FieldKind::Duration
1439        | FieldKind::Enum { .. }
1440        | FieldKind::Float32
1441        | FieldKind::Float64
1442        | FieldKind::Int128
1443        | FieldKind::IntBig
1444        | FieldKind::List(_)
1445        | FieldKind::Map { .. }
1446        | FieldKind::Principal
1447        | FieldKind::Set(_)
1448        | FieldKind::Structured { .. }
1449        | FieldKind::Subaccount
1450        | FieldKind::Text
1451        | FieldKind::Timestamp
1452        | FieldKind::Uint128
1453        | FieldKind::UintBig
1454        | FieldKind::Ulid
1455        | FieldKind::Unit => None,
1456    }
1457}
1458
1459#[inline(never)]
1460pub(in crate::db) fn apply_lowered_select_shape(
1461    mut query: StructuralQuery,
1462    lowered: LoweredSelectShape,
1463) -> Result<StructuralQuery, SqlLoweringError> {
1464    let LoweredSelectShape {
1465        scalar_projection_fields,
1466        grouped_projection_aggregates,
1467        group_by_fields,
1468        distinct,
1469        having,
1470        predicate,
1471        order_by,
1472        limit,
1473        offset,
1474    } = lowered;
1475    let model = query.model();
1476
1477    // Phase 1: apply grouped declaration semantics.
1478    for field in group_by_fields {
1479        query = query.group_by(field)?;
1480    }
1481
1482    // Phase 2: apply scalar DISTINCT and projection contracts.
1483    if distinct {
1484        query = query.distinct();
1485    }
1486    if let Some(fields) = scalar_projection_fields {
1487        query = query.select_fields(fields);
1488    }
1489    for aggregate in grouped_projection_aggregates {
1490        query = query.aggregate(lower_aggregate_call(aggregate)?);
1491    }
1492
1493    // Phase 3: bind resolved HAVING clauses against grouped terminals.
1494    for clause in having {
1495        match clause {
1496            ResolvedHavingClause::GroupField { field, op, value } => {
1497                let value = model_field_kind(model, &field)
1498                    .and_then(|field_kind| {
1499                        canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &value)
1500                    })
1501                    .unwrap_or(value);
1502                query = query.having_group(field, op, value)?;
1503            }
1504            ResolvedHavingClause::Aggregate {
1505                aggregate_index,
1506                op,
1507                value,
1508            } => {
1509                query = query.having_aggregate(aggregate_index, op, value)?;
1510            }
1511        }
1512    }
1513
1514    // Phase 4: attach the shared filter/order/page tail through the base-query lane.
1515    Ok(apply_lowered_base_query_shape(
1516        query,
1517        LoweredBaseQueryShape {
1518            predicate: predicate
1519                .map(|predicate| canonicalize_sql_predicate_for_model(model, predicate)),
1520            order_by,
1521            limit,
1522            offset,
1523        },
1524    ))
1525}
1526
1527fn apply_lowered_base_query_shape(
1528    mut query: StructuralQuery,
1529    lowered: LoweredBaseQueryShape,
1530) -> StructuralQuery {
1531    if let Some(predicate) = lowered.predicate {
1532        query = query.filter(predicate);
1533    }
1534    query = apply_order_terms_structural(query, lowered.order_by);
1535    if let Some(limit) = lowered.limit {
1536        query = query.limit(limit);
1537    }
1538    if let Some(offset) = lowered.offset {
1539        query = query.offset(offset);
1540    }
1541
1542    query
1543}
1544
1545pub(in crate::db) fn bind_lowered_sql_query_structural(
1546    model: &'static crate::model::entity::EntityModel,
1547    lowered: LoweredSqlQuery,
1548    consistency: MissingRowPolicy,
1549) -> Result<StructuralQuery, SqlLoweringError> {
1550    match lowered {
1551        LoweredSqlQuery::Select(select) => {
1552            bind_lowered_sql_select_query_structural(model, select, consistency)
1553        }
1554        LoweredSqlQuery::Delete(delete) => Ok(bind_lowered_sql_delete_query_structural(
1555            model,
1556            delete,
1557            consistency,
1558        )),
1559    }
1560}
1561
1562/// Bind one lowered SQL SELECT shape onto the structural query surface.
1563///
1564/// This keeps the field-only SQL read lane narrow and owner-local: any caller
1565/// that already resolved entity authority can reuse the same lowered-SELECT to
1566/// structural-query boundary without reopening SQL shape application itself.
1567pub(in crate::db) fn bind_lowered_sql_select_query_structural(
1568    model: &'static crate::model::entity::EntityModel,
1569    select: LoweredSelectShape,
1570    consistency: MissingRowPolicy,
1571) -> Result<StructuralQuery, SqlLoweringError> {
1572    apply_lowered_select_shape(StructuralQuery::new(model, consistency), select)
1573}
1574
1575pub(in crate::db) fn bind_lowered_sql_delete_query_structural(
1576    model: &'static crate::model::entity::EntityModel,
1577    delete: LoweredBaseQueryShape,
1578    consistency: MissingRowPolicy,
1579) -> StructuralQuery {
1580    apply_lowered_base_query_shape(StructuralQuery::new(model, consistency).delete(), delete)
1581}
1582
1583pub(in crate::db) fn bind_lowered_sql_query<E: EntityKind>(
1584    lowered: LoweredSqlQuery,
1585    consistency: MissingRowPolicy,
1586) -> Result<Query<E>, SqlLoweringError> {
1587    let structural = bind_lowered_sql_query_structural(E::MODEL, lowered, consistency)?;
1588
1589    Ok(Query::from_inner(structural))
1590}
1591
1592fn bind_lowered_sql_global_aggregate_command<E: EntityKind>(
1593    lowered: LoweredSqlGlobalAggregateCommand,
1594    consistency: MissingRowPolicy,
1595) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
1596    let terminal = bind_lowered_sql_global_aggregate_terminal::<E>(lowered.terminal)?;
1597
1598    Ok(SqlGlobalAggregateCommand {
1599        query: Query::from_inner(apply_lowered_base_query_shape(
1600            StructuralQuery::new(E::MODEL, consistency),
1601            lowered.query,
1602        )),
1603        terminal,
1604    })
1605}
1606
1607fn bind_lowered_sql_global_aggregate_command_structural(
1608    model: &'static crate::model::entity::EntityModel,
1609    lowered: LoweredSqlGlobalAggregateCommand,
1610    consistency: MissingRowPolicy,
1611) -> SqlGlobalAggregateCommandCore {
1612    SqlGlobalAggregateCommandCore {
1613        query: apply_lowered_base_query_shape(
1614            StructuralQuery::new(model, consistency),
1615            lowered.query,
1616        ),
1617        terminal: lowered.terminal,
1618    }
1619}
1620
1621fn lower_global_aggregate_terminal(
1622    projection: SqlProjection,
1623) -> Result<SqlGlobalAggregateTerminal, SqlLoweringError> {
1624    let SqlProjection::Items(items) = projection else {
1625        return Err(SqlLoweringError::unsupported_select_projection());
1626    };
1627    if items.len() != 1 {
1628        return Err(SqlLoweringError::unsupported_select_projection());
1629    }
1630
1631    let Some(SqlSelectItem::Aggregate(aggregate)) = items.into_iter().next() else {
1632        return Err(SqlLoweringError::unsupported_select_projection());
1633    };
1634
1635    match lower_sql_aggregate_shape(aggregate)? {
1636        LoweredSqlAggregateShape::CountRows => Ok(SqlGlobalAggregateTerminal::CountRows),
1637        LoweredSqlAggregateShape::CountField(field) => {
1638            Ok(SqlGlobalAggregateTerminal::CountField(field))
1639        }
1640        LoweredSqlAggregateShape::FieldTarget {
1641            kind: SqlAggregateKind::Sum,
1642            field,
1643        } => Ok(SqlGlobalAggregateTerminal::SumField(field)),
1644        LoweredSqlAggregateShape::FieldTarget {
1645            kind: SqlAggregateKind::Avg,
1646            field,
1647        } => Ok(SqlGlobalAggregateTerminal::AvgField(field)),
1648        LoweredSqlAggregateShape::FieldTarget {
1649            kind: SqlAggregateKind::Min,
1650            field,
1651        } => Ok(SqlGlobalAggregateTerminal::MinField(field)),
1652        LoweredSqlAggregateShape::FieldTarget {
1653            kind: SqlAggregateKind::Max,
1654            field,
1655        } => Ok(SqlGlobalAggregateTerminal::MaxField(field)),
1656        LoweredSqlAggregateShape::FieldTarget {
1657            kind: SqlAggregateKind::Count,
1658            ..
1659        } => Err(SqlLoweringError::unsupported_select_projection()),
1660    }
1661}
1662
1663fn lower_sql_aggregate_shape(
1664    call: SqlAggregateCall,
1665) -> Result<LoweredSqlAggregateShape, SqlLoweringError> {
1666    match (call.kind, call.field) {
1667        (SqlAggregateKind::Count, None) => Ok(LoweredSqlAggregateShape::CountRows),
1668        (SqlAggregateKind::Count, Some(field)) => Ok(LoweredSqlAggregateShape::CountField(field)),
1669        (
1670            kind @ (SqlAggregateKind::Sum
1671            | SqlAggregateKind::Avg
1672            | SqlAggregateKind::Min
1673            | SqlAggregateKind::Max),
1674            Some(field),
1675        ) => Ok(LoweredSqlAggregateShape::FieldTarget { kind, field }),
1676        _ => Err(SqlLoweringError::unsupported_select_projection()),
1677    }
1678}
1679
1680fn grouped_projection_aggregate_calls(
1681    projection: &SqlProjection,
1682    group_by_fields: &[String],
1683) -> Result<Vec<SqlAggregateCall>, SqlLoweringError> {
1684    if group_by_fields.is_empty() {
1685        return Err(SqlLoweringError::unsupported_select_group_by());
1686    }
1687
1688    let SqlProjection::Items(items) = projection else {
1689        return Err(SqlLoweringError::unsupported_select_group_by());
1690    };
1691
1692    let mut projected_group_fields = Vec::<String>::new();
1693    let mut aggregate_calls = Vec::<SqlAggregateCall>::new();
1694    let mut seen_aggregate = false;
1695
1696    for item in items {
1697        match item {
1698            SqlSelectItem::Field(field) => {
1699                // Keep grouped projection deterministic and mappable to grouped
1700                // response contracts: group keys must be declared first.
1701                if seen_aggregate {
1702                    return Err(SqlLoweringError::unsupported_select_group_by());
1703                }
1704                projected_group_fields.push(field.clone());
1705            }
1706            SqlSelectItem::Aggregate(aggregate) => {
1707                seen_aggregate = true;
1708                aggregate_calls.push(aggregate.clone());
1709            }
1710            SqlSelectItem::TextFunction(_) => {
1711                return Err(SqlLoweringError::unsupported_select_group_by());
1712            }
1713        }
1714    }
1715
1716    if aggregate_calls.is_empty() || projected_group_fields.as_slice() != group_by_fields {
1717        return Err(SqlLoweringError::unsupported_select_group_by());
1718    }
1719
1720    Ok(aggregate_calls)
1721}
1722
1723fn lower_aggregate_call(
1724    call: SqlAggregateCall,
1725) -> Result<crate::db::query::builder::AggregateExpr, SqlLoweringError> {
1726    match lower_sql_aggregate_shape(call)? {
1727        LoweredSqlAggregateShape::CountRows => Ok(count()),
1728        LoweredSqlAggregateShape::CountField(field) => Ok(count_by(field)),
1729        LoweredSqlAggregateShape::FieldTarget {
1730            kind: SqlAggregateKind::Sum,
1731            field,
1732        } => Ok(sum(field)),
1733        LoweredSqlAggregateShape::FieldTarget {
1734            kind: SqlAggregateKind::Avg,
1735            field,
1736        } => Ok(avg(field)),
1737        LoweredSqlAggregateShape::FieldTarget {
1738            kind: SqlAggregateKind::Min,
1739            field,
1740        } => Ok(min_by(field)),
1741        LoweredSqlAggregateShape::FieldTarget {
1742            kind: SqlAggregateKind::Max,
1743            field,
1744        } => Ok(max_by(field)),
1745        LoweredSqlAggregateShape::FieldTarget {
1746            kind: SqlAggregateKind::Count,
1747            ..
1748        } => Err(SqlLoweringError::unsupported_select_projection()),
1749    }
1750}
1751
1752fn resolve_having_aggregate_index(
1753    target: &SqlAggregateCall,
1754    grouped_projection_aggregates: &[SqlAggregateCall],
1755) -> Result<usize, SqlLoweringError> {
1756    let mut matched = grouped_projection_aggregates
1757        .iter()
1758        .enumerate()
1759        .filter_map(|(index, aggregate)| (aggregate == target).then_some(index));
1760    let Some(index) = matched.next() else {
1761        return Err(SqlLoweringError::unsupported_select_having());
1762    };
1763    if matched.next().is_some() {
1764        return Err(SqlLoweringError::unsupported_select_having());
1765    }
1766
1767    Ok(index)
1768}
1769
1770fn lower_delete_shape(statement: SqlDeleteStatement) -> LoweredBaseQueryShape {
1771    let SqlDeleteStatement {
1772        predicate,
1773        order_by,
1774        limit,
1775        entity: _,
1776    } = statement;
1777
1778    LoweredBaseQueryShape {
1779        predicate,
1780        order_by,
1781        limit,
1782        offset: None,
1783    }
1784}
1785
1786fn apply_order_terms_structural(
1787    mut query: StructuralQuery,
1788    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1789) -> StructuralQuery {
1790    for term in order_by {
1791        query = match term.direction {
1792            SqlOrderDirection::Asc => query.order_by(term.field),
1793            SqlOrderDirection::Desc => query.order_by_desc(term.field),
1794        };
1795    }
1796
1797    query
1798}
1799
1800fn normalize_having_clauses(
1801    clauses: Vec<SqlHavingClause>,
1802    entity_scope: &[String],
1803) -> Vec<SqlHavingClause> {
1804    clauses
1805        .into_iter()
1806        .map(|clause| SqlHavingClause {
1807            symbol: normalize_having_symbol(clause.symbol, entity_scope),
1808            op: clause.op,
1809            value: clause.value,
1810        })
1811        .collect()
1812}
1813
1814fn normalize_having_symbol(symbol: SqlHavingSymbol, entity_scope: &[String]) -> SqlHavingSymbol {
1815    match symbol {
1816        SqlHavingSymbol::Field(field) => {
1817            SqlHavingSymbol::Field(normalize_identifier_to_scope(field, entity_scope))
1818        }
1819        SqlHavingSymbol::Aggregate(aggregate) => SqlHavingSymbol::Aggregate(
1820            normalize_aggregate_call_identifiers(aggregate, entity_scope),
1821        ),
1822    }
1823}
1824
1825fn normalize_aggregate_call_identifiers(
1826    aggregate: SqlAggregateCall,
1827    entity_scope: &[String],
1828) -> SqlAggregateCall {
1829    SqlAggregateCall {
1830        kind: aggregate.kind,
1831        field: aggregate
1832            .field
1833            .map(|field| normalize_identifier_to_scope(field, entity_scope)),
1834    }
1835}
1836
1837// Build one identifier scope used for reducing SQL-qualified field references
1838// (`entity.field`, `schema.entity.field`) into canonical planner field names.
1839fn sql_entity_scope_candidates(sql_entity: &str, expected_entity: &'static str) -> Vec<String> {
1840    let mut out = Vec::new();
1841    out.push(sql_entity.to_string());
1842    out.push(expected_entity.to_string());
1843
1844    if let Some(last) = identifier_last_segment(sql_entity) {
1845        out.push(last.to_string());
1846    }
1847    if let Some(last) = identifier_last_segment(expected_entity) {
1848        out.push(last.to_string());
1849    }
1850
1851    out
1852}
1853
1854fn normalize_projection_identifiers(
1855    projection: SqlProjection,
1856    entity_scope: &[String],
1857) -> SqlProjection {
1858    match projection {
1859        SqlProjection::All => SqlProjection::All,
1860        SqlProjection::Items(items) => SqlProjection::Items(
1861            items
1862                .into_iter()
1863                .map(|item| match item {
1864                    SqlSelectItem::Field(field) => {
1865                        SqlSelectItem::Field(normalize_identifier(field, entity_scope))
1866                    }
1867                    SqlSelectItem::Aggregate(aggregate) => {
1868                        SqlSelectItem::Aggregate(SqlAggregateCall {
1869                            kind: aggregate.kind,
1870                            field: aggregate
1871                                .field
1872                                .map(|field| normalize_identifier(field, entity_scope)),
1873                        })
1874                    }
1875                    SqlSelectItem::TextFunction(SqlTextFunctionCall {
1876                        function,
1877                        field,
1878                        literal,
1879                        literal2,
1880                        literal3,
1881                    }) => SqlSelectItem::TextFunction(SqlTextFunctionCall {
1882                        function,
1883                        field: normalize_identifier(field, entity_scope),
1884                        literal,
1885                        literal2,
1886                        literal3,
1887                    }),
1888                })
1889                .collect(),
1890        ),
1891    }
1892}
1893
1894fn normalize_order_terms(
1895    terms: Vec<crate::db::sql::parser::SqlOrderTerm>,
1896    entity_scope: &[String],
1897) -> Vec<crate::db::sql::parser::SqlOrderTerm> {
1898    terms
1899        .into_iter()
1900        .map(|term| crate::db::sql::parser::SqlOrderTerm {
1901            field: normalize_order_term_identifier(term.field, entity_scope),
1902            direction: term.direction,
1903        })
1904        .collect()
1905}
1906
1907fn normalize_order_term_identifier(identifier: String, entity_scope: &[String]) -> String {
1908    let Some(expression) = ExpressionOrderTerm::parse(identifier.as_str()) else {
1909        return normalize_identifier(identifier, entity_scope);
1910    };
1911    let normalized_field = normalize_identifier(expression.field().to_string(), entity_scope);
1912
1913    expression.canonical_text_with_field(normalized_field.as_str())
1914}
1915
1916fn normalize_identifier_list(fields: Vec<String>, entity_scope: &[String]) -> Vec<String> {
1917    fields
1918        .into_iter()
1919        .map(|field| normalize_identifier(field, entity_scope))
1920        .collect()
1921}
1922
1923// SQL lowering only adapts identifier qualification (`entity.field` -> `field`)
1924// and delegates predicate-tree traversal ownership to `db::predicate`.
1925fn adapt_predicate_identifiers_to_scope(
1926    predicate: Predicate,
1927    entity_scope: &[String],
1928) -> Predicate {
1929    rewrite_field_identifiers(predicate, |field| normalize_identifier(field, entity_scope))
1930}
1931
1932fn normalize_identifier(identifier: String, entity_scope: &[String]) -> String {
1933    normalize_identifier_to_scope(identifier, entity_scope)
1934}
1935
1936fn ensure_entity_matches_expected(
1937    sql_entity: &str,
1938    expected_entity: &'static str,
1939) -> Result<(), SqlLoweringError> {
1940    if identifiers_tail_match(sql_entity, expected_entity) {
1941        return Ok(());
1942    }
1943
1944    Err(SqlLoweringError::entity_mismatch(
1945        sql_entity,
1946        expected_entity,
1947    ))
1948}