Skip to main content

icydb_core/db/sql/lowering/
mod.rs

1//! Module: db::sql::lowering
2//! Responsibility: reduced SQL statement lowering into canonical query intent.
3//! Does not own: SQL tokenization/parsing, planner validation policy, or executor semantics.
4//! Boundary: frontend-only translation from parsed SQL statement contracts to `Query<E>`.
5
6///
7/// TESTS
8///
9
10#[cfg(test)]
11mod tests;
12
13use crate::{
14    db::{
15        predicate::{CoercionId, CompareOp, MissingRowPolicy, Predicate},
16        query::{
17            builder::{
18                AggregateExpr,
19                aggregate::{avg, count, count_by, max_by, min_by, sum},
20            },
21            intent::{Query, QueryError, StructuralQuery},
22            plan::{
23                AggregateKind, ExpressionOrderTerm, FieldSlot, resolve_aggregate_target_field_slot,
24            },
25        },
26        sql::identifier::{
27            identifier_last_segment, identifiers_tail_match, normalize_identifier_to_scope,
28            rewrite_field_identifiers,
29        },
30        sql::parser::{
31            SqlAggregateCall, SqlAggregateKind, SqlDeleteStatement, SqlExplainMode,
32            SqlExplainStatement, SqlExplainTarget, SqlHavingClause, SqlHavingSymbol,
33            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
34            SqlStatement, SqlTextFunctionCall,
35        },
36    },
37    model::{entity::EntityModel, field::FieldKind},
38    traits::EntityKind,
39    value::Value,
40};
41use thiserror::Error as ThisError;
42
43///
44/// LoweredSqlCommand
45///
46/// Generic-free SQL command shape after reduced SQL parsing and entity-route
47/// normalization.
48/// This keeps statement-shape lowering shared across entities before typed
49/// `Query<E>` binding happens at the execution boundary.
50///
51#[derive(Clone, Debug)]
52pub struct LoweredSqlCommand(LoweredSqlCommandInner);
53
54#[derive(Clone, Debug)]
55enum LoweredSqlCommandInner {
56    Query(LoweredSqlQuery),
57    Explain {
58        mode: SqlExplainMode,
59        query: LoweredSqlQuery,
60    },
61    ExplainGlobalAggregate {
62        mode: SqlExplainMode,
63        command: LoweredSqlGlobalAggregateCommand,
64    },
65    DescribeEntity,
66    ShowIndexesEntity,
67    ShowColumnsEntity,
68    ShowEntities,
69}
70
71///
72/// SqlCommand
73///
74/// Test-only typed SQL command shell over the shared lowered SQL surface.
75/// Runtime dispatch now consumes `LoweredSqlCommand` directly, but lowering
76/// tests still validate typed binding behavior on this local envelope.
77///
78#[cfg(test)]
79#[derive(Debug)]
80pub(crate) enum SqlCommand<E: EntityKind> {
81    Query(Query<E>),
82    Explain {
83        mode: SqlExplainMode,
84        query: Query<E>,
85    },
86    ExplainGlobalAggregate {
87        mode: SqlExplainMode,
88        command: SqlGlobalAggregateCommand<E>,
89    },
90    DescribeEntity,
91    ShowIndexesEntity,
92    ShowColumnsEntity,
93    ShowEntities,
94}
95
96impl LoweredSqlCommand {
97    #[must_use]
98    pub(in crate::db) const fn query(&self) -> Option<&LoweredSqlQuery> {
99        match &self.0 {
100            LoweredSqlCommandInner::Query(query) => Some(query),
101            LoweredSqlCommandInner::Explain { .. }
102            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
103            | LoweredSqlCommandInner::DescribeEntity
104            | LoweredSqlCommandInner::ShowIndexesEntity
105            | LoweredSqlCommandInner::ShowColumnsEntity
106            | LoweredSqlCommandInner::ShowEntities => None,
107        }
108    }
109
110    #[must_use]
111    pub(in crate::db) const fn explain_query(&self) -> Option<(SqlExplainMode, &LoweredSqlQuery)> {
112        match &self.0 {
113            LoweredSqlCommandInner::Explain { mode, query } => Some((*mode, query)),
114            LoweredSqlCommandInner::Query(_)
115            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
116            | LoweredSqlCommandInner::DescribeEntity
117            | LoweredSqlCommandInner::ShowIndexesEntity
118            | LoweredSqlCommandInner::ShowColumnsEntity
119            | LoweredSqlCommandInner::ShowEntities => None,
120        }
121    }
122}
123
124///
125/// LoweredSqlQuery
126///
127/// Generic-free executable SQL query shape prepared before typed query binding.
128/// Select and delete lowering stay shared until the final `Query<E>` build.
129///
130#[derive(Clone, Debug)]
131pub(crate) enum LoweredSqlQuery {
132    Select(LoweredSelectShape),
133    Delete(LoweredBaseQueryShape),
134}
135
136impl LoweredSqlQuery {
137    // Report whether this lowered query carries grouped execution semantics.
138    pub(crate) const fn has_grouping(&self) -> bool {
139        match self {
140            Self::Select(select) => select.has_grouping(),
141            Self::Delete(_) => false,
142        }
143    }
144}
145
146///
147/// SqlGlobalAggregateTerminal
148///
149/// Global SQL aggregate terminals currently executable through dedicated
150/// aggregate SQL entrypoints.
151///
152
153#[derive(Clone, Debug, Eq, PartialEq)]
154pub(crate) enum SqlGlobalAggregateTerminal {
155    CountRows,
156    CountField(String),
157    SumField(String),
158    AvgField(String),
159    MinField(String),
160    MaxField(String),
161}
162
163///
164/// TypedSqlGlobalAggregateTerminal
165///
166/// TypedSqlGlobalAggregateTerminal is the typed global aggregate contract used
167/// after entity binding resolves one concrete model.
168/// Field-target variants carry a resolved planner field slot so typed SQL
169/// aggregate execution does not re-resolve the same field name before dispatch.
170///
171#[derive(Clone, Debug, Eq, PartialEq)]
172pub(crate) enum TypedSqlGlobalAggregateTerminal {
173    CountRows,
174    CountField(FieldSlot),
175    SumField(FieldSlot),
176    AvgField(FieldSlot),
177    MinField(FieldSlot),
178    MaxField(FieldSlot),
179}
180
181/// PreparedSqlScalarAggregateDomain
182///
183/// Typed SQL scalar aggregate execution domain selected before session runtime
184/// dispatch. This keeps the typed aggregate lane explicit about which internal
185/// execution family will consume the request.
186#[derive(Clone, Copy, Debug, Eq, PartialEq)]
187pub(crate) enum PreparedSqlScalarAggregateDomain {
188    ExistingRows,
189    ProjectionField,
190    NumericField,
191    ScalarExtremaValue,
192}
193
194/// PreparedSqlScalarAggregateOrderingRequirement
195///
196/// Ordering sensitivity required by the selected typed SQL scalar aggregate
197/// strategy. This keeps first-slice descriptor/explain consumers off local
198/// kind checks when they need to know whether field order semantics matter.
199#[derive(Clone, Copy, Debug, Eq, PartialEq)]
200pub(crate) enum PreparedSqlScalarAggregateOrderingRequirement {
201    None,
202    FieldOrder,
203}
204
205/// PreparedSqlScalarAggregateRowSource
206///
207/// Canonical row-source shape for one prepared typed SQL scalar aggregate
208/// strategy. This describes what kind of row-derived data the execution family
209/// ultimately consumes.
210#[derive(Clone, Copy, Debug, Eq, PartialEq)]
211pub(crate) enum PreparedSqlScalarAggregateRowSource {
212    ExistingRows,
213    ProjectedField,
214    NumericField,
215    ExtremalWinnerField,
216}
217
218/// PreparedSqlScalarAggregateEmptySetBehavior
219///
220/// Canonical empty-window result behavior for one prepared typed SQL scalar
221/// aggregate strategy.
222#[derive(Clone, Copy, Debug, Eq, PartialEq)]
223pub(crate) enum PreparedSqlScalarAggregateEmptySetBehavior {
224    Zero,
225    Null,
226}
227
228/// PreparedSqlScalarAggregateDescriptorShape
229///
230/// Stable typed SQL scalar aggregate descriptor shape derived once at the SQL
231/// aggregate preparation boundary and reused by runtime/EXPLAIN projections.
232#[derive(Clone, Copy, Debug, Eq, PartialEq)]
233pub(crate) enum PreparedSqlScalarAggregateDescriptorShape {
234    CountRows,
235    CountField,
236    SumField,
237    AvgField,
238    MinField,
239    MaxField,
240}
241
242/// PreparedSqlScalarAggregateRuntimeDescriptor
243///
244/// Stable runtime-family projection for one prepared typed SQL scalar
245/// aggregate strategy.
246/// Session SQL aggregate execution consumes this descriptor instead of
247/// rebuilding runtime boundary choice from raw SQL terminal variants or
248/// parallel metadata tuple matches.
249#[derive(Clone, Copy, Debug, Eq, PartialEq)]
250pub(crate) enum PreparedSqlScalarAggregateRuntimeDescriptor {
251    CountRows,
252    CountField,
253    NumericField { kind: AggregateKind },
254    ExtremalWinnerField { kind: AggregateKind },
255}
256
257///
258/// PreparedSqlScalarAggregateStrategy
259///
260/// PreparedSqlScalarAggregateStrategy is the single typed SQL scalar aggregate
261/// behavior source for the first `0.71` slice.
262/// It resolves aggregate domain, descriptor shape, target-slot ownership, and
263/// aggregate expression once so runtime and EXPLAIN do not re-derive that
264/// behavior from raw SQL terminal variants.
265///
266#[derive(Clone, Debug, Eq, PartialEq)]
267pub(crate) struct PreparedSqlScalarAggregateStrategy {
268    aggregate: AggregateExpr,
269    target_slot: Option<FieldSlot>,
270    domain: PreparedSqlScalarAggregateDomain,
271    ordering_requirement: PreparedSqlScalarAggregateOrderingRequirement,
272    row_source: PreparedSqlScalarAggregateRowSource,
273    empty_set_behavior: PreparedSqlScalarAggregateEmptySetBehavior,
274    descriptor_shape: PreparedSqlScalarAggregateDescriptorShape,
275}
276
277impl PreparedSqlScalarAggregateStrategy {
278    const fn new(
279        aggregate: AggregateExpr,
280        target_slot: Option<FieldSlot>,
281        domain: PreparedSqlScalarAggregateDomain,
282        ordering_requirement: PreparedSqlScalarAggregateOrderingRequirement,
283        row_source: PreparedSqlScalarAggregateRowSource,
284        empty_set_behavior: PreparedSqlScalarAggregateEmptySetBehavior,
285        descriptor_shape: PreparedSqlScalarAggregateDescriptorShape,
286    ) -> Self {
287        Self {
288            aggregate,
289            target_slot,
290            domain,
291            ordering_requirement,
292            row_source,
293            empty_set_behavior,
294            descriptor_shape,
295        }
296    }
297
298    fn from_typed_terminal(terminal: &TypedSqlGlobalAggregateTerminal) -> Self {
299        match terminal {
300            TypedSqlGlobalAggregateTerminal::CountRows => Self::new(
301                count(),
302                None,
303                PreparedSqlScalarAggregateDomain::ExistingRows,
304                PreparedSqlScalarAggregateOrderingRequirement::None,
305                PreparedSqlScalarAggregateRowSource::ExistingRows,
306                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
307                PreparedSqlScalarAggregateDescriptorShape::CountRows,
308            ),
309            TypedSqlGlobalAggregateTerminal::CountField(target_slot) => Self::new(
310                count_by(target_slot.field()),
311                Some(target_slot.clone()),
312                PreparedSqlScalarAggregateDomain::ProjectionField,
313                PreparedSqlScalarAggregateOrderingRequirement::None,
314                PreparedSqlScalarAggregateRowSource::ProjectedField,
315                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
316                PreparedSqlScalarAggregateDescriptorShape::CountField,
317            ),
318            TypedSqlGlobalAggregateTerminal::SumField(target_slot) => Self::new(
319                sum(target_slot.field()),
320                Some(target_slot.clone()),
321                PreparedSqlScalarAggregateDomain::NumericField,
322                PreparedSqlScalarAggregateOrderingRequirement::None,
323                PreparedSqlScalarAggregateRowSource::NumericField,
324                PreparedSqlScalarAggregateEmptySetBehavior::Null,
325                PreparedSqlScalarAggregateDescriptorShape::SumField,
326            ),
327            TypedSqlGlobalAggregateTerminal::AvgField(target_slot) => Self::new(
328                avg(target_slot.field()),
329                Some(target_slot.clone()),
330                PreparedSqlScalarAggregateDomain::NumericField,
331                PreparedSqlScalarAggregateOrderingRequirement::None,
332                PreparedSqlScalarAggregateRowSource::NumericField,
333                PreparedSqlScalarAggregateEmptySetBehavior::Null,
334                PreparedSqlScalarAggregateDescriptorShape::AvgField,
335            ),
336            TypedSqlGlobalAggregateTerminal::MinField(target_slot) => Self::new(
337                min_by(target_slot.field()),
338                Some(target_slot.clone()),
339                PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
340                PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
341                PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
342                PreparedSqlScalarAggregateEmptySetBehavior::Null,
343                PreparedSqlScalarAggregateDescriptorShape::MinField,
344            ),
345            TypedSqlGlobalAggregateTerminal::MaxField(target_slot) => Self::new(
346                max_by(target_slot.field()),
347                Some(target_slot.clone()),
348                PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
349                PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
350                PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
351                PreparedSqlScalarAggregateEmptySetBehavior::Null,
352                PreparedSqlScalarAggregateDescriptorShape::MaxField,
353            ),
354        }
355    }
356
357    fn from_lowered_terminal_with_model(
358        model: &'static EntityModel,
359        terminal: &SqlGlobalAggregateTerminal,
360    ) -> Result<Self, SqlLoweringError> {
361        let resolve_target_slot = |field: &str| {
362            resolve_aggregate_target_field_slot(model, field).map_err(SqlLoweringError::from)
363        };
364
365        match terminal {
366            SqlGlobalAggregateTerminal::CountRows => Ok(Self::new(
367                count(),
368                None,
369                PreparedSqlScalarAggregateDomain::ExistingRows,
370                PreparedSqlScalarAggregateOrderingRequirement::None,
371                PreparedSqlScalarAggregateRowSource::ExistingRows,
372                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
373                PreparedSqlScalarAggregateDescriptorShape::CountRows,
374            )),
375            SqlGlobalAggregateTerminal::CountField(field) => {
376                let target_slot = resolve_target_slot(field.as_str())?;
377
378                Ok(Self::new(
379                    count_by(field.as_str()),
380                    Some(target_slot),
381                    PreparedSqlScalarAggregateDomain::ProjectionField,
382                    PreparedSqlScalarAggregateOrderingRequirement::None,
383                    PreparedSqlScalarAggregateRowSource::ProjectedField,
384                    PreparedSqlScalarAggregateEmptySetBehavior::Zero,
385                    PreparedSqlScalarAggregateDescriptorShape::CountField,
386                ))
387            }
388            SqlGlobalAggregateTerminal::SumField(field) => {
389                let target_slot = resolve_target_slot(field.as_str())?;
390
391                Ok(Self::new(
392                    sum(field.as_str()),
393                    Some(target_slot),
394                    PreparedSqlScalarAggregateDomain::NumericField,
395                    PreparedSqlScalarAggregateOrderingRequirement::None,
396                    PreparedSqlScalarAggregateRowSource::NumericField,
397                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
398                    PreparedSqlScalarAggregateDescriptorShape::SumField,
399                ))
400            }
401            SqlGlobalAggregateTerminal::AvgField(field) => {
402                let target_slot = resolve_target_slot(field.as_str())?;
403
404                Ok(Self::new(
405                    avg(field.as_str()),
406                    Some(target_slot),
407                    PreparedSqlScalarAggregateDomain::NumericField,
408                    PreparedSqlScalarAggregateOrderingRequirement::None,
409                    PreparedSqlScalarAggregateRowSource::NumericField,
410                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
411                    PreparedSqlScalarAggregateDescriptorShape::AvgField,
412                ))
413            }
414            SqlGlobalAggregateTerminal::MinField(field) => {
415                let target_slot = resolve_target_slot(field.as_str())?;
416
417                Ok(Self::new(
418                    min_by(field.as_str()),
419                    Some(target_slot),
420                    PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
421                    PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
422                    PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
423                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
424                    PreparedSqlScalarAggregateDescriptorShape::MinField,
425                ))
426            }
427            SqlGlobalAggregateTerminal::MaxField(field) => {
428                let target_slot = resolve_target_slot(field.as_str())?;
429
430                Ok(Self::new(
431                    max_by(field.as_str()),
432                    Some(target_slot),
433                    PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
434                    PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
435                    PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
436                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
437                    PreparedSqlScalarAggregateDescriptorShape::MaxField,
438                ))
439            }
440        }
441    }
442
443    /// Borrow the aggregate expression projected by this prepared SQL scalar strategy.
444    #[must_use]
445    pub(crate) const fn aggregate(&self) -> &AggregateExpr {
446        &self.aggregate
447    }
448
449    /// Borrow the resolved target slot when this prepared SQL scalar strategy is field-targeted.
450    #[must_use]
451    pub(crate) const fn target_slot(&self) -> Option<&FieldSlot> {
452        self.target_slot.as_ref()
453    }
454
455    /// Return the canonical typed SQL scalar aggregate domain.
456    #[cfg(test)]
457    #[must_use]
458    pub(crate) const fn domain(&self) -> PreparedSqlScalarAggregateDomain {
459        self.domain
460    }
461
462    /// Return the stable descriptor/runtime shape label for this prepared strategy.
463    #[cfg(test)]
464    #[must_use]
465    pub(crate) const fn descriptor_shape(&self) -> PreparedSqlScalarAggregateDescriptorShape {
466        self.descriptor_shape
467    }
468
469    /// Return the stable runtime-family projection for this prepared SQL
470    /// scalar aggregate strategy.
471    #[must_use]
472    pub(crate) const fn runtime_descriptor(&self) -> PreparedSqlScalarAggregateRuntimeDescriptor {
473        match self.descriptor_shape {
474            PreparedSqlScalarAggregateDescriptorShape::CountRows => {
475                PreparedSqlScalarAggregateRuntimeDescriptor::CountRows
476            }
477            PreparedSqlScalarAggregateDescriptorShape::CountField => {
478                PreparedSqlScalarAggregateRuntimeDescriptor::CountField
479            }
480            PreparedSqlScalarAggregateDescriptorShape::SumField => {
481                PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
482                    kind: AggregateKind::Sum,
483                }
484            }
485            PreparedSqlScalarAggregateDescriptorShape::AvgField => {
486                PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
487                    kind: AggregateKind::Avg,
488                }
489            }
490            PreparedSqlScalarAggregateDescriptorShape::MinField => {
491                PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
492                    kind: AggregateKind::Min,
493                }
494            }
495            PreparedSqlScalarAggregateDescriptorShape::MaxField => {
496                PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
497                    kind: AggregateKind::Max,
498                }
499            }
500        }
501    }
502
503    /// Return the canonical aggregate kind for this prepared SQL scalar strategy.
504    #[must_use]
505    pub(crate) const fn aggregate_kind(&self) -> AggregateKind {
506        self.aggregate.kind()
507    }
508
509    /// Return the projected field label for descriptor/explain projection when
510    /// this prepared strategy is field-targeted.
511    #[must_use]
512    pub(crate) fn projected_field(&self) -> Option<&str> {
513        self.target_slot().map(FieldSlot::field)
514    }
515
516    /// Return field-order sensitivity for this prepared SQL scalar aggregate strategy.
517    #[cfg(test)]
518    #[must_use]
519    pub(crate) const fn ordering_requirement(
520        &self,
521    ) -> PreparedSqlScalarAggregateOrderingRequirement {
522        self.ordering_requirement
523    }
524
525    /// Return the canonical row-source shape for this prepared strategy.
526    #[cfg(test)]
527    #[must_use]
528    pub(crate) const fn row_source(&self) -> PreparedSqlScalarAggregateRowSource {
529        self.row_source
530    }
531
532    /// Return empty-window behavior for this prepared SQL scalar aggregate strategy.
533    #[cfg(test)]
534    #[must_use]
535    pub(crate) const fn empty_set_behavior(&self) -> PreparedSqlScalarAggregateEmptySetBehavior {
536        self.empty_set_behavior
537    }
538}
539
540///
541/// LoweredSqlGlobalAggregateCommand
542///
543/// Generic-free global aggregate command shape prepared before typed query
544/// binding.
545/// This keeps aggregate SQL lowering shared across entities until the final
546/// execution boundary converts the base query shape into `Query<E>`.
547///
548#[derive(Clone, Debug)]
549pub(crate) struct LoweredSqlGlobalAggregateCommand {
550    query: LoweredBaseQueryShape,
551    terminal: SqlGlobalAggregateTerminal,
552}
553
554///
555/// LoweredSqlAggregateShape
556///
557/// Locally validated aggregate-call shape used by SQL lowering to avoid
558/// duplicating `(SqlAggregateKind, field)` validation across lowering lanes.
559///
560
561enum LoweredSqlAggregateShape {
562    CountRows,
563    CountField(String),
564    FieldTarget {
565        kind: SqlAggregateKind,
566        field: String,
567    },
568}
569
570///
571/// SqlGlobalAggregateCommand
572///
573/// Lowered global SQL aggregate command carrying base query shape plus terminal.
574///
575
576#[derive(Debug)]
577pub(crate) struct SqlGlobalAggregateCommand<E: EntityKind> {
578    query: Query<E>,
579    terminal: TypedSqlGlobalAggregateTerminal,
580}
581
582impl<E: EntityKind> SqlGlobalAggregateCommand<E> {
583    /// Borrow the lowered base query shape for aggregate execution.
584    #[must_use]
585    pub(crate) const fn query(&self) -> &Query<E> {
586        &self.query
587    }
588
589    /// Borrow the lowered aggregate terminal.
590    #[cfg(test)]
591    #[must_use]
592    pub(crate) const fn terminal(&self) -> &TypedSqlGlobalAggregateTerminal {
593        &self.terminal
594    }
595
596    /// Prepare one typed SQL scalar aggregate strategy from the typed command boundary.
597    #[must_use]
598    pub(crate) fn prepared_scalar_strategy(&self) -> PreparedSqlScalarAggregateStrategy {
599        PreparedSqlScalarAggregateStrategy::from_typed_terminal(&self.terminal)
600    }
601}
602
603///
604/// SqlGlobalAggregateCommandCore
605///
606/// Generic-free lowered global aggregate command bound onto the structural
607/// query surface.
608/// This keeps global aggregate EXPLAIN on the shared query/explain path until
609/// a typed boundary is strictly required.
610///
611
612#[derive(Debug)]
613pub(crate) struct SqlGlobalAggregateCommandCore {
614    query: StructuralQuery,
615    terminal: SqlGlobalAggregateTerminal,
616}
617
618impl SqlGlobalAggregateCommandCore {
619    /// Borrow the structural query payload for aggregate explain/execution.
620    #[must_use]
621    pub(in crate::db) const fn query(&self) -> &StructuralQuery {
622        &self.query
623    }
624
625    /// Prepare one structural SQL scalar aggregate strategy using one concrete model.
626    pub(in crate::db) fn prepared_scalar_strategy_with_model(
627        &self,
628        model: &'static EntityModel,
629    ) -> Result<PreparedSqlScalarAggregateStrategy, SqlLoweringError> {
630        PreparedSqlScalarAggregateStrategy::from_lowered_terminal_with_model(model, &self.terminal)
631    }
632}
633
634///
635/// SqlLoweringError
636///
637/// SQL frontend lowering failures before planner validation/execution.
638///
639
640#[derive(Debug, ThisError)]
641pub(crate) enum SqlLoweringError {
642    #[error("{0}")]
643    Parse(#[from] crate::db::sql::parser::SqlParseError),
644
645    #[error("{0}")]
646    Query(#[from] QueryError),
647
648    #[error("SQL entity '{sql_entity}' does not match requested entity type '{expected_entity}'")]
649    EntityMismatch {
650        sql_entity: String,
651        expected_entity: &'static str,
652    },
653
654    #[error(
655        "unsupported SQL SELECT projection; supported forms are SELECT *, field lists, or grouped aggregate shapes"
656    )]
657    UnsupportedSelectProjection,
658
659    #[error("unsupported SQL SELECT DISTINCT")]
660    UnsupportedSelectDistinct,
661
662    #[error("unsupported SQL GROUP BY projection shape")]
663    UnsupportedSelectGroupBy,
664
665    #[error("unsupported SQL HAVING shape")]
666    UnsupportedSelectHaving,
667}
668
669impl SqlLoweringError {
670    /// Construct one entity-mismatch SQL lowering error.
671    fn entity_mismatch(sql_entity: impl Into<String>, expected_entity: &'static str) -> Self {
672        Self::EntityMismatch {
673            sql_entity: sql_entity.into(),
674            expected_entity,
675        }
676    }
677
678    /// Construct one unsupported SELECT projection SQL lowering error.
679    const fn unsupported_select_projection() -> Self {
680        Self::UnsupportedSelectProjection
681    }
682
683    /// Construct one unsupported SELECT DISTINCT SQL lowering error.
684    const fn unsupported_select_distinct() -> Self {
685        Self::UnsupportedSelectDistinct
686    }
687
688    /// Construct one unsupported SELECT GROUP BY shape SQL lowering error.
689    const fn unsupported_select_group_by() -> Self {
690        Self::UnsupportedSelectGroupBy
691    }
692
693    /// Construct one unsupported SELECT HAVING shape SQL lowering error.
694    const fn unsupported_select_having() -> Self {
695        Self::UnsupportedSelectHaving
696    }
697}
698
699///
700/// PreparedSqlStatement
701///
702/// SQL statement envelope after entity-scope normalization and
703/// entity-match validation for one target entity descriptor.
704///
705/// This pre-lowering contract is entity-agnostic and reusable across
706/// dynamic SQL route branches before typed `Query<E>` binding.
707///
708
709#[derive(Clone, Debug)]
710pub(crate) struct PreparedSqlStatement {
711    statement: SqlStatement,
712}
713
714#[derive(Clone, Copy, Debug, Eq, PartialEq)]
715pub(crate) enum LoweredSqlLaneKind {
716    Query,
717    Explain,
718    Describe,
719    ShowIndexes,
720    ShowColumns,
721    ShowEntities,
722}
723
724/// Parse and lower one SQL statement into canonical query intent for `E`.
725#[cfg(test)]
726pub(crate) fn compile_sql_command<E: EntityKind>(
727    sql: &str,
728    consistency: MissingRowPolicy,
729) -> Result<SqlCommand<E>, SqlLoweringError> {
730    let statement = crate::db::sql::parser::parse_sql(sql)?;
731    compile_sql_command_from_statement::<E>(statement, consistency)
732}
733
734/// Lower one parsed SQL statement into canonical query intent for `E`.
735#[cfg(test)]
736pub(crate) fn compile_sql_command_from_statement<E: EntityKind>(
737    statement: SqlStatement,
738    consistency: MissingRowPolicy,
739) -> Result<SqlCommand<E>, SqlLoweringError> {
740    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
741    compile_sql_command_from_prepared_statement::<E>(prepared, consistency)
742}
743
744/// Lower one prepared SQL statement into canonical query intent for `E`.
745#[cfg(test)]
746pub(crate) fn compile_sql_command_from_prepared_statement<E: EntityKind>(
747    prepared: PreparedSqlStatement,
748    consistency: MissingRowPolicy,
749) -> Result<SqlCommand<E>, SqlLoweringError> {
750    let lowered = lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)?;
751
752    bind_lowered_sql_command::<E>(lowered, consistency)
753}
754
755/// Lower one prepared SQL statement into one shared generic-free command shape.
756#[inline(never)]
757pub(crate) fn lower_sql_command_from_prepared_statement(
758    prepared: PreparedSqlStatement,
759    primary_key_field: &str,
760) -> Result<LoweredSqlCommand, SqlLoweringError> {
761    lower_prepared_statement(prepared.statement, primary_key_field)
762}
763
764pub(crate) const fn lowered_sql_command_lane(command: &LoweredSqlCommand) -> LoweredSqlLaneKind {
765    match command.0 {
766        LoweredSqlCommandInner::Query(_) => LoweredSqlLaneKind::Query,
767        LoweredSqlCommandInner::Explain { .. }
768        | LoweredSqlCommandInner::ExplainGlobalAggregate { .. } => LoweredSqlLaneKind::Explain,
769        LoweredSqlCommandInner::DescribeEntity => LoweredSqlLaneKind::Describe,
770        LoweredSqlCommandInner::ShowIndexesEntity => LoweredSqlLaneKind::ShowIndexes,
771        LoweredSqlCommandInner::ShowColumnsEntity => LoweredSqlLaneKind::ShowColumns,
772        LoweredSqlCommandInner::ShowEntities => LoweredSqlLaneKind::ShowEntities,
773    }
774}
775
776/// Return whether one parsed SQL statement is an executable constrained global
777/// aggregate shape owned by the dedicated aggregate lane.
778pub(in crate::db) fn is_sql_global_aggregate_statement(statement: &SqlStatement) -> bool {
779    let SqlStatement::Select(statement) = statement else {
780        return false;
781    };
782
783    is_sql_global_aggregate_select(statement)
784}
785
786// Detect one constrained global aggregate select shape without widening any
787// non-aggregate SQL surface onto the dedicated aggregate execution lane.
788fn is_sql_global_aggregate_select(statement: &SqlSelectStatement) -> bool {
789    if statement.distinct || !statement.group_by.is_empty() || !statement.having.is_empty() {
790        return false;
791    }
792
793    lower_global_aggregate_terminal(statement.projection.clone()).is_ok()
794}
795
796/// Bind one lowered global aggregate EXPLAIN shape onto the structural query
797/// surface when the explain command carries that specialized form.
798pub(crate) fn bind_lowered_sql_explain_global_aggregate_structural(
799    lowered: &LoweredSqlCommand,
800    model: &'static crate::model::entity::EntityModel,
801    consistency: MissingRowPolicy,
802) -> Option<(SqlExplainMode, SqlGlobalAggregateCommandCore)> {
803    let LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } = &lowered.0 else {
804        return None;
805    };
806
807    Some((
808        *mode,
809        bind_lowered_sql_global_aggregate_command_structural(model, command.clone(), consistency),
810    ))
811}
812
813/// Bind one shared generic-free SQL command shape to the typed query surface.
814#[cfg(test)]
815pub(crate) fn bind_lowered_sql_command<E: EntityKind>(
816    lowered: LoweredSqlCommand,
817    consistency: MissingRowPolicy,
818) -> Result<SqlCommand<E>, SqlLoweringError> {
819    match lowered.0 {
820        LoweredSqlCommandInner::Query(query) => Ok(SqlCommand::Query(bind_lowered_sql_query::<E>(
821            query,
822            consistency,
823        )?)),
824        LoweredSqlCommandInner::Explain { mode, query } => Ok(SqlCommand::Explain {
825            mode,
826            query: bind_lowered_sql_query::<E>(query, consistency)?,
827        }),
828        LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } => {
829            Ok(SqlCommand::ExplainGlobalAggregate {
830                mode,
831                command: bind_lowered_sql_global_aggregate_command::<E>(command, consistency)?,
832            })
833        }
834        LoweredSqlCommandInner::DescribeEntity => Ok(SqlCommand::DescribeEntity),
835        LoweredSqlCommandInner::ShowIndexesEntity => Ok(SqlCommand::ShowIndexesEntity),
836        LoweredSqlCommandInner::ShowColumnsEntity => Ok(SqlCommand::ShowColumnsEntity),
837        LoweredSqlCommandInner::ShowEntities => Ok(SqlCommand::ShowEntities),
838    }
839}
840
841/// Prepare one parsed SQL statement for one expected entity route.
842#[inline(never)]
843pub(crate) fn prepare_sql_statement(
844    statement: SqlStatement,
845    expected_entity: &'static str,
846) -> Result<PreparedSqlStatement, SqlLoweringError> {
847    let statement = prepare_statement(statement, expected_entity)?;
848
849    Ok(PreparedSqlStatement { statement })
850}
851
852/// Parse and lower one SQL statement into global aggregate execution command for `E`.
853#[cfg(test)]
854pub(crate) fn compile_sql_global_aggregate_command<E: EntityKind>(
855    sql: &str,
856    consistency: MissingRowPolicy,
857) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
858    let statement = crate::db::sql::parser::parse_sql(sql)?;
859    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
860    compile_sql_global_aggregate_command_from_prepared::<E>(prepared, consistency)
861}
862
863// Lower one already-prepared SQL statement into the constrained global
864// aggregate command envelope so callers that already parsed and routed the
865// statement do not pay the parser again.
866pub(crate) fn compile_sql_global_aggregate_command_from_prepared<E: EntityKind>(
867    prepared: PreparedSqlStatement,
868    consistency: MissingRowPolicy,
869) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
870    let SqlStatement::Select(statement) = prepared.statement else {
871        return Err(SqlLoweringError::unsupported_select_projection());
872    };
873
874    bind_lowered_sql_global_aggregate_command::<E>(
875        lower_global_aggregate_select_shape(statement)?,
876        consistency,
877    )
878}
879
880fn bind_lowered_sql_global_aggregate_terminal<E: EntityKind>(
881    terminal: SqlGlobalAggregateTerminal,
882) -> Result<TypedSqlGlobalAggregateTerminal, SqlLoweringError> {
883    let resolve_target_slot = |field: &str| {
884        resolve_aggregate_target_field_slot(E::MODEL, field).map_err(SqlLoweringError::from)
885    };
886
887    match terminal {
888        SqlGlobalAggregateTerminal::CountRows => Ok(TypedSqlGlobalAggregateTerminal::CountRows),
889        SqlGlobalAggregateTerminal::CountField(field) => Ok(
890            TypedSqlGlobalAggregateTerminal::CountField(resolve_target_slot(field.as_str())?),
891        ),
892        SqlGlobalAggregateTerminal::SumField(field) => Ok(
893            TypedSqlGlobalAggregateTerminal::SumField(resolve_target_slot(field.as_str())?),
894        ),
895        SqlGlobalAggregateTerminal::AvgField(field) => Ok(
896            TypedSqlGlobalAggregateTerminal::AvgField(resolve_target_slot(field.as_str())?),
897        ),
898        SqlGlobalAggregateTerminal::MinField(field) => Ok(
899            TypedSqlGlobalAggregateTerminal::MinField(resolve_target_slot(field.as_str())?),
900        ),
901        SqlGlobalAggregateTerminal::MaxField(field) => Ok(
902            TypedSqlGlobalAggregateTerminal::MaxField(resolve_target_slot(field.as_str())?),
903        ),
904    }
905}
906
907#[inline(never)]
908fn prepare_statement(
909    statement: SqlStatement,
910    expected_entity: &'static str,
911) -> Result<SqlStatement, SqlLoweringError> {
912    match statement {
913        SqlStatement::Select(statement) => Ok(SqlStatement::Select(prepare_select_statement(
914            statement,
915            expected_entity,
916        )?)),
917        SqlStatement::Delete(statement) => Ok(SqlStatement::Delete(prepare_delete_statement(
918            statement,
919            expected_entity,
920        )?)),
921        SqlStatement::Explain(statement) => Ok(SqlStatement::Explain(prepare_explain_statement(
922            statement,
923            expected_entity,
924        )?)),
925        SqlStatement::Describe(statement) => {
926            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
927
928            Ok(SqlStatement::Describe(statement))
929        }
930        SqlStatement::ShowIndexes(statement) => {
931            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
932
933            Ok(SqlStatement::ShowIndexes(statement))
934        }
935        SqlStatement::ShowColumns(statement) => {
936            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
937
938            Ok(SqlStatement::ShowColumns(statement))
939        }
940        SqlStatement::ShowEntities(statement) => Ok(SqlStatement::ShowEntities(statement)),
941    }
942}
943
944fn prepare_explain_statement(
945    statement: SqlExplainStatement,
946    expected_entity: &'static str,
947) -> Result<SqlExplainStatement, SqlLoweringError> {
948    let target = match statement.statement {
949        SqlExplainTarget::Select(select_statement) => {
950            SqlExplainTarget::Select(prepare_select_statement(select_statement, expected_entity)?)
951        }
952        SqlExplainTarget::Delete(delete_statement) => {
953            SqlExplainTarget::Delete(prepare_delete_statement(delete_statement, expected_entity)?)
954        }
955    };
956
957    Ok(SqlExplainStatement {
958        mode: statement.mode,
959        statement: target,
960    })
961}
962
963fn prepare_select_statement(
964    statement: SqlSelectStatement,
965    expected_entity: &'static str,
966) -> Result<SqlSelectStatement, SqlLoweringError> {
967    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
968
969    Ok(normalize_select_statement_to_expected_entity(
970        statement,
971        expected_entity,
972    ))
973}
974
975fn normalize_select_statement_to_expected_entity(
976    mut statement: SqlSelectStatement,
977    expected_entity: &'static str,
978) -> SqlSelectStatement {
979    // Re-scope parsed identifiers onto the resolved entity surface after the
980    // caller has already established entity ownership for this statement.
981    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
982    statement.projection =
983        normalize_projection_identifiers(statement.projection, entity_scope.as_slice());
984    statement.group_by = normalize_identifier_list(statement.group_by, entity_scope.as_slice());
985    statement.predicate = statement
986        .predicate
987        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
988    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
989    statement.having = normalize_having_clauses(statement.having, entity_scope.as_slice());
990
991    statement
992}
993
994fn prepare_delete_statement(
995    mut statement: SqlDeleteStatement,
996    expected_entity: &'static str,
997) -> Result<SqlDeleteStatement, SqlLoweringError> {
998    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
999    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
1000    statement.predicate = statement
1001        .predicate
1002        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
1003    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
1004
1005    Ok(statement)
1006}
1007
1008#[inline(never)]
1009fn lower_prepared_statement(
1010    statement: SqlStatement,
1011    primary_key_field: &str,
1012) -> Result<LoweredSqlCommand, SqlLoweringError> {
1013    match statement {
1014        SqlStatement::Select(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
1015            LoweredSqlQuery::Select(lower_select_shape(statement, primary_key_field)?),
1016        ))),
1017        SqlStatement::Delete(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
1018            LoweredSqlQuery::Delete(lower_delete_shape(statement)),
1019        ))),
1020        SqlStatement::Explain(statement) => lower_explain_prepared(statement, primary_key_field),
1021        SqlStatement::Describe(_) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::DescribeEntity)),
1022        SqlStatement::ShowIndexes(_) => {
1023            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowIndexesEntity))
1024        }
1025        SqlStatement::ShowColumns(_) => {
1026            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowColumnsEntity))
1027        }
1028        SqlStatement::ShowEntities(_) => {
1029            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowEntities))
1030        }
1031    }
1032}
1033
1034fn lower_explain_prepared(
1035    statement: SqlExplainStatement,
1036    primary_key_field: &str,
1037) -> Result<LoweredSqlCommand, SqlLoweringError> {
1038    let mode = statement.mode;
1039
1040    match statement.statement {
1041        SqlExplainTarget::Select(select_statement) => {
1042            lower_explain_select_prepared(select_statement, mode, primary_key_field)
1043        }
1044        SqlExplainTarget::Delete(delete_statement) => {
1045            Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
1046                mode,
1047                query: LoweredSqlQuery::Delete(lower_delete_shape(delete_statement)),
1048            }))
1049        }
1050    }
1051}
1052
1053fn lower_explain_select_prepared(
1054    statement: SqlSelectStatement,
1055    mode: SqlExplainMode,
1056    primary_key_field: &str,
1057) -> Result<LoweredSqlCommand, SqlLoweringError> {
1058    match lower_select_shape(statement.clone(), primary_key_field) {
1059        Ok(query) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
1060            mode,
1061            query: LoweredSqlQuery::Select(query),
1062        })),
1063        Err(SqlLoweringError::UnsupportedSelectProjection) => {
1064            let command = lower_global_aggregate_select_shape(statement)?;
1065
1066            Ok(LoweredSqlCommand(
1067                LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command },
1068            ))
1069        }
1070        Err(err) => Err(err),
1071    }
1072}
1073
1074fn lower_global_aggregate_select_shape(
1075    statement: SqlSelectStatement,
1076) -> Result<LoweredSqlGlobalAggregateCommand, SqlLoweringError> {
1077    let SqlSelectStatement {
1078        projection,
1079        predicate,
1080        distinct,
1081        group_by,
1082        having,
1083        order_by,
1084        limit,
1085        offset,
1086        entity: _,
1087    } = statement;
1088
1089    if distinct {
1090        return Err(SqlLoweringError::unsupported_select_distinct());
1091    }
1092    if !group_by.is_empty() {
1093        return Err(SqlLoweringError::unsupported_select_group_by());
1094    }
1095    if !having.is_empty() {
1096        return Err(SqlLoweringError::unsupported_select_having());
1097    }
1098
1099    let terminal = lower_global_aggregate_terminal(projection)?;
1100
1101    Ok(LoweredSqlGlobalAggregateCommand {
1102        query: LoweredBaseQueryShape {
1103            predicate,
1104            order_by,
1105            limit,
1106            offset,
1107        },
1108        terminal,
1109    })
1110}
1111
1112///
1113/// ResolvedHavingClause
1114///
1115/// Pre-resolved HAVING clause shape after SQL projection aggregate index
1116/// resolution. This keeps SQL shape analysis entity-agnostic before typed
1117/// query binding.
1118///
1119#[derive(Clone, Debug)]
1120enum ResolvedHavingClause {
1121    GroupField {
1122        field: String,
1123        op: crate::db::predicate::CompareOp,
1124        value: crate::value::Value,
1125    },
1126    Aggregate {
1127        aggregate_index: usize,
1128        op: crate::db::predicate::CompareOp,
1129        value: crate::value::Value,
1130    },
1131}
1132
1133///
1134/// LoweredSelectShape
1135///
1136/// Entity-agnostic lowered SQL SELECT shape prepared for typed `Query<E>`
1137/// binding.
1138///
1139#[derive(Clone, Debug)]
1140pub(crate) struct LoweredSelectShape {
1141    scalar_projection_fields: Option<Vec<String>>,
1142    grouped_projection_aggregates: Vec<SqlAggregateCall>,
1143    group_by_fields: Vec<String>,
1144    distinct: bool,
1145    having: Vec<ResolvedHavingClause>,
1146    predicate: Option<Predicate>,
1147    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1148    limit: Option<u32>,
1149    offset: Option<u32>,
1150}
1151
1152impl LoweredSelectShape {
1153    // Report whether this lowered select shape carries grouped execution state.
1154    const fn has_grouping(&self) -> bool {
1155        !self.group_by_fields.is_empty()
1156    }
1157}
1158
1159///
1160/// LoweredBaseQueryShape
1161///
1162/// Generic-free filter/order/window query modifiers shared by delete and
1163/// global-aggregate SQL lowering.
1164/// This keeps common SQL query-shape lowering shared before typed query
1165/// binding.
1166///
1167#[derive(Clone, Debug)]
1168pub(crate) struct LoweredBaseQueryShape {
1169    predicate: Option<Predicate>,
1170    order_by: Vec<SqlOrderTerm>,
1171    limit: Option<u32>,
1172    offset: Option<u32>,
1173}
1174
1175#[inline(never)]
1176fn lower_select_shape(
1177    statement: SqlSelectStatement,
1178    primary_key_field: &str,
1179) -> Result<LoweredSelectShape, SqlLoweringError> {
1180    let SqlSelectStatement {
1181        projection,
1182        predicate,
1183        distinct,
1184        group_by,
1185        having,
1186        order_by,
1187        limit,
1188        offset,
1189        entity: _,
1190    } = statement;
1191    let projection_for_having = projection.clone();
1192
1193    // Phase 1: resolve scalar/grouped projection shape.
1194    let (scalar_projection_fields, grouped_projection_aggregates) = if group_by.is_empty() {
1195        let scalar_projection_fields =
1196            lower_scalar_projection_fields(projection, distinct, primary_key_field)?;
1197        (scalar_projection_fields, Vec::new())
1198    } else {
1199        if distinct {
1200            return Err(SqlLoweringError::unsupported_select_distinct());
1201        }
1202        let grouped_projection_aggregates =
1203            grouped_projection_aggregate_calls(&projection, group_by.as_slice())?;
1204        (None, grouped_projection_aggregates)
1205    };
1206
1207    // Phase 2: resolve HAVING symbols against grouped projection authority.
1208    let having = lower_having_clauses(
1209        having,
1210        &projection_for_having,
1211        group_by.as_slice(),
1212        grouped_projection_aggregates.as_slice(),
1213    )?;
1214
1215    Ok(LoweredSelectShape {
1216        scalar_projection_fields,
1217        grouped_projection_aggregates,
1218        group_by_fields: group_by,
1219        distinct,
1220        having,
1221        predicate,
1222        order_by,
1223        limit,
1224        offset,
1225    })
1226}
1227
1228fn lower_scalar_projection_fields(
1229    projection: SqlProjection,
1230    distinct: bool,
1231    primary_key_field: &str,
1232) -> Result<Option<Vec<String>>, SqlLoweringError> {
1233    let SqlProjection::Items(items) = projection else {
1234        if distinct {
1235            return Ok(None);
1236        }
1237
1238        return Ok(None);
1239    };
1240
1241    let has_aggregate = items
1242        .iter()
1243        .any(|item| matches!(item, SqlSelectItem::Aggregate(_)));
1244    if has_aggregate {
1245        return Err(SqlLoweringError::unsupported_select_projection());
1246    }
1247
1248    let fields = items
1249        .into_iter()
1250        .map(|item| match item {
1251            SqlSelectItem::Field(field) => Ok(field),
1252            SqlSelectItem::Aggregate(_) | SqlSelectItem::TextFunction(_) => {
1253                Err(SqlLoweringError::unsupported_select_projection())
1254            }
1255        })
1256        .collect::<Result<Vec<_>, _>>()?;
1257
1258    validate_scalar_distinct_projection(distinct, fields.as_slice(), primary_key_field)?;
1259
1260    Ok(Some(fields))
1261}
1262
1263fn validate_scalar_distinct_projection(
1264    distinct: bool,
1265    projection_fields: &[String],
1266    primary_key_field: &str,
1267) -> Result<(), SqlLoweringError> {
1268    if !distinct {
1269        return Ok(());
1270    }
1271
1272    if projection_fields.is_empty() {
1273        return Ok(());
1274    }
1275
1276    let has_primary_key_field = projection_fields
1277        .iter()
1278        .any(|field| field == primary_key_field);
1279    if !has_primary_key_field {
1280        return Err(SqlLoweringError::unsupported_select_distinct());
1281    }
1282
1283    Ok(())
1284}
1285
1286fn lower_having_clauses(
1287    having_clauses: Vec<SqlHavingClause>,
1288    projection: &SqlProjection,
1289    group_by_fields: &[String],
1290    grouped_projection_aggregates: &[SqlAggregateCall],
1291) -> Result<Vec<ResolvedHavingClause>, SqlLoweringError> {
1292    if having_clauses.is_empty() {
1293        return Ok(Vec::new());
1294    }
1295    if group_by_fields.is_empty() {
1296        return Err(SqlLoweringError::unsupported_select_having());
1297    }
1298
1299    let projection_aggregates = grouped_projection_aggregate_calls(projection, group_by_fields)
1300        .map_err(|_| SqlLoweringError::unsupported_select_having())?;
1301    if projection_aggregates.as_slice() != grouped_projection_aggregates {
1302        return Err(SqlLoweringError::unsupported_select_having());
1303    }
1304
1305    let mut lowered = Vec::with_capacity(having_clauses.len());
1306    for clause in having_clauses {
1307        match clause.symbol {
1308            SqlHavingSymbol::Field(field) => lowered.push(ResolvedHavingClause::GroupField {
1309                field,
1310                op: clause.op,
1311                value: clause.value,
1312            }),
1313            SqlHavingSymbol::Aggregate(aggregate) => {
1314                let aggregate_index =
1315                    resolve_having_aggregate_index(&aggregate, grouped_projection_aggregates)?;
1316                lowered.push(ResolvedHavingClause::Aggregate {
1317                    aggregate_index,
1318                    op: clause.op,
1319                    value: clause.value,
1320                });
1321            }
1322        }
1323    }
1324
1325    Ok(lowered)
1326}
1327
1328// Canonicalize strict numeric SQL predicate literals onto the resolved model
1329// field kind so unsigned-width fields keep strict/indexable semantics even
1330// though reduced SQL integer tokens parse through one generic numeric value
1331// variant first.
1332fn canonicalize_sql_predicate_for_model(
1333    model: &'static EntityModel,
1334    predicate: Predicate,
1335) -> Predicate {
1336    match predicate {
1337        Predicate::And(children) => Predicate::And(
1338            children
1339                .into_iter()
1340                .map(|child| canonicalize_sql_predicate_for_model(model, child))
1341                .collect(),
1342        ),
1343        Predicate::Or(children) => Predicate::Or(
1344            children
1345                .into_iter()
1346                .map(|child| canonicalize_sql_predicate_for_model(model, child))
1347                .collect(),
1348        ),
1349        Predicate::Not(inner) => Predicate::Not(Box::new(canonicalize_sql_predicate_for_model(
1350            model, *inner,
1351        ))),
1352        Predicate::Compare(mut cmp) => {
1353            canonicalize_sql_compare_for_model(model, &mut cmp);
1354            Predicate::Compare(cmp)
1355        }
1356        Predicate::True
1357        | Predicate::False
1358        | Predicate::IsNull { .. }
1359        | Predicate::IsNotNull { .. }
1360        | Predicate::IsMissing { .. }
1361        | Predicate::IsEmpty { .. }
1362        | Predicate::IsNotEmpty { .. }
1363        | Predicate::TextContains { .. }
1364        | Predicate::TextContainsCi { .. } => predicate,
1365    }
1366}
1367
1368// Resolve one lowered predicate field onto the runtime model kind that owns
1369// its strict literal compatibility rules.
1370fn model_field_kind(model: &'static EntityModel, field: &str) -> Option<FieldKind> {
1371    model
1372        .fields()
1373        .iter()
1374        .find(|candidate| candidate.name() == field)
1375        .map(crate::model::field::FieldModel::kind)
1376}
1377
1378// Keep SQL-only literal widening narrow:
1379// - only strict equality-style numeric predicates are eligible
1380// - ordering already uses `NumericWiden`
1381// - text and expression-wrapped predicates stay untouched
1382fn canonicalize_sql_compare_for_model(
1383    model: &'static EntityModel,
1384    cmp: &mut crate::db::predicate::ComparePredicate,
1385) {
1386    if cmp.coercion.id != CoercionId::Strict {
1387        return;
1388    }
1389
1390    let Some(field_kind) = model_field_kind(model, &cmp.field) else {
1391        return;
1392    };
1393
1394    match cmp.op {
1395        CompareOp::Eq | CompareOp::Ne => {
1396            if let Some(value) =
1397                canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &cmp.value)
1398            {
1399                cmp.value = value;
1400            }
1401        }
1402        CompareOp::In | CompareOp::NotIn => {
1403            let Value::List(items) = &cmp.value else {
1404                return;
1405            };
1406
1407            let items = items
1408                .iter()
1409                .map(|item| {
1410                    canonicalize_strict_sql_numeric_value_for_kind(&field_kind, item)
1411                        .unwrap_or_else(|| item.clone())
1412                })
1413                .collect();
1414            cmp.value = Value::List(items);
1415        }
1416        CompareOp::Lt
1417        | CompareOp::Lte
1418        | CompareOp::Gt
1419        | CompareOp::Gte
1420        | CompareOp::Contains
1421        | CompareOp::StartsWith
1422        | CompareOp::EndsWith => {}
1423    }
1424}
1425
1426// Convert one parsed SQL numeric literal into the exact runtime `Value` variant
1427// required by the field kind when that conversion is lossless and unambiguous.
1428// This preserves strict equality semantics while still letting SQL express
1429// unsigned-width comparisons such as `Nat16`/`u64` fields.
1430fn canonicalize_strict_sql_numeric_value_for_kind(
1431    kind: &FieldKind,
1432    value: &Value,
1433) -> Option<Value> {
1434    match kind {
1435        FieldKind::Relation { key_kind, .. } => {
1436            canonicalize_strict_sql_numeric_value_for_kind(key_kind, value)
1437        }
1438        FieldKind::Int => match value {
1439            Value::Int(inner) => Some(Value::Int(*inner)),
1440            Value::Uint(inner) => i64::try_from(*inner).ok().map(Value::Int),
1441            _ => None,
1442        },
1443        FieldKind::Uint => match value {
1444            Value::Int(inner) => u64::try_from(*inner).ok().map(Value::Uint),
1445            Value::Uint(inner) => Some(Value::Uint(*inner)),
1446            _ => None,
1447        },
1448        FieldKind::Account
1449        | FieldKind::Blob
1450        | FieldKind::Bool
1451        | FieldKind::Date
1452        | FieldKind::Decimal { .. }
1453        | FieldKind::Duration
1454        | FieldKind::Enum { .. }
1455        | FieldKind::Float32
1456        | FieldKind::Float64
1457        | FieldKind::Int128
1458        | FieldKind::IntBig
1459        | FieldKind::List(_)
1460        | FieldKind::Map { .. }
1461        | FieldKind::Principal
1462        | FieldKind::Set(_)
1463        | FieldKind::Structured { .. }
1464        | FieldKind::Subaccount
1465        | FieldKind::Text
1466        | FieldKind::Timestamp
1467        | FieldKind::Uint128
1468        | FieldKind::UintBig
1469        | FieldKind::Ulid
1470        | FieldKind::Unit => None,
1471    }
1472}
1473
1474#[inline(never)]
1475pub(in crate::db) fn apply_lowered_select_shape(
1476    mut query: StructuralQuery,
1477    lowered: LoweredSelectShape,
1478) -> Result<StructuralQuery, SqlLoweringError> {
1479    let LoweredSelectShape {
1480        scalar_projection_fields,
1481        grouped_projection_aggregates,
1482        group_by_fields,
1483        distinct,
1484        having,
1485        predicate,
1486        order_by,
1487        limit,
1488        offset,
1489    } = lowered;
1490    let model = query.model();
1491
1492    // Phase 1: apply grouped declaration semantics.
1493    for field in group_by_fields {
1494        query = query.group_by(field)?;
1495    }
1496
1497    // Phase 2: apply scalar DISTINCT and projection contracts.
1498    if distinct {
1499        query = query.distinct();
1500    }
1501    if let Some(fields) = scalar_projection_fields {
1502        query = query.select_fields(fields);
1503    }
1504    for aggregate in grouped_projection_aggregates {
1505        query = query.aggregate(lower_aggregate_call(aggregate)?);
1506    }
1507
1508    // Phase 3: bind resolved HAVING clauses against grouped terminals.
1509    for clause in having {
1510        match clause {
1511            ResolvedHavingClause::GroupField { field, op, value } => {
1512                let value = model_field_kind(model, &field)
1513                    .and_then(|field_kind| {
1514                        canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &value)
1515                    })
1516                    .unwrap_or(value);
1517                query = query.having_group(field, op, value)?;
1518            }
1519            ResolvedHavingClause::Aggregate {
1520                aggregate_index,
1521                op,
1522                value,
1523            } => {
1524                query = query.having_aggregate(aggregate_index, op, value)?;
1525            }
1526        }
1527    }
1528
1529    // Phase 4: attach the shared filter/order/page tail through the base-query lane.
1530    Ok(apply_lowered_base_query_shape(
1531        query,
1532        LoweredBaseQueryShape {
1533            predicate: predicate
1534                .map(|predicate| canonicalize_sql_predicate_for_model(model, predicate)),
1535            order_by,
1536            limit,
1537            offset,
1538        },
1539    ))
1540}
1541
1542fn apply_lowered_base_query_shape(
1543    mut query: StructuralQuery,
1544    lowered: LoweredBaseQueryShape,
1545) -> StructuralQuery {
1546    if let Some(predicate) = lowered.predicate {
1547        query = query.filter(predicate);
1548    }
1549    query = apply_order_terms_structural(query, lowered.order_by);
1550    if let Some(limit) = lowered.limit {
1551        query = query.limit(limit);
1552    }
1553    if let Some(offset) = lowered.offset {
1554        query = query.offset(offset);
1555    }
1556
1557    query
1558}
1559
1560pub(in crate::db) fn bind_lowered_sql_query_structural(
1561    model: &'static crate::model::entity::EntityModel,
1562    lowered: LoweredSqlQuery,
1563    consistency: MissingRowPolicy,
1564) -> Result<StructuralQuery, SqlLoweringError> {
1565    match lowered {
1566        LoweredSqlQuery::Select(select) => {
1567            apply_lowered_select_shape(StructuralQuery::new(model, consistency), select)
1568        }
1569        LoweredSqlQuery::Delete(delete) => Ok(bind_lowered_sql_delete_query_structural(
1570            model,
1571            delete,
1572            consistency,
1573        )),
1574    }
1575}
1576
1577pub(in crate::db) fn bind_lowered_sql_delete_query_structural(
1578    model: &'static crate::model::entity::EntityModel,
1579    delete: LoweredBaseQueryShape,
1580    consistency: MissingRowPolicy,
1581) -> StructuralQuery {
1582    apply_lowered_base_query_shape(StructuralQuery::new(model, consistency).delete(), delete)
1583}
1584
1585pub(in crate::db) fn bind_lowered_sql_query<E: EntityKind>(
1586    lowered: LoweredSqlQuery,
1587    consistency: MissingRowPolicy,
1588) -> Result<Query<E>, SqlLoweringError> {
1589    let structural = bind_lowered_sql_query_structural(E::MODEL, lowered, consistency)?;
1590
1591    Ok(Query::from_inner(structural))
1592}
1593
1594fn bind_lowered_sql_global_aggregate_command<E: EntityKind>(
1595    lowered: LoweredSqlGlobalAggregateCommand,
1596    consistency: MissingRowPolicy,
1597) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
1598    let terminal = bind_lowered_sql_global_aggregate_terminal::<E>(lowered.terminal)?;
1599
1600    Ok(SqlGlobalAggregateCommand {
1601        query: Query::from_inner(apply_lowered_base_query_shape(
1602            StructuralQuery::new(E::MODEL, consistency),
1603            lowered.query,
1604        )),
1605        terminal,
1606    })
1607}
1608
1609fn bind_lowered_sql_global_aggregate_command_structural(
1610    model: &'static crate::model::entity::EntityModel,
1611    lowered: LoweredSqlGlobalAggregateCommand,
1612    consistency: MissingRowPolicy,
1613) -> SqlGlobalAggregateCommandCore {
1614    SqlGlobalAggregateCommandCore {
1615        query: apply_lowered_base_query_shape(
1616            StructuralQuery::new(model, consistency),
1617            lowered.query,
1618        ),
1619        terminal: lowered.terminal,
1620    }
1621}
1622
1623fn lower_global_aggregate_terminal(
1624    projection: SqlProjection,
1625) -> Result<SqlGlobalAggregateTerminal, SqlLoweringError> {
1626    let SqlProjection::Items(items) = projection else {
1627        return Err(SqlLoweringError::unsupported_select_projection());
1628    };
1629    if items.len() != 1 {
1630        return Err(SqlLoweringError::unsupported_select_projection());
1631    }
1632
1633    let Some(SqlSelectItem::Aggregate(aggregate)) = items.into_iter().next() else {
1634        return Err(SqlLoweringError::unsupported_select_projection());
1635    };
1636
1637    match lower_sql_aggregate_shape(aggregate)? {
1638        LoweredSqlAggregateShape::CountRows => Ok(SqlGlobalAggregateTerminal::CountRows),
1639        LoweredSqlAggregateShape::CountField(field) => {
1640            Ok(SqlGlobalAggregateTerminal::CountField(field))
1641        }
1642        LoweredSqlAggregateShape::FieldTarget {
1643            kind: SqlAggregateKind::Sum,
1644            field,
1645        } => Ok(SqlGlobalAggregateTerminal::SumField(field)),
1646        LoweredSqlAggregateShape::FieldTarget {
1647            kind: SqlAggregateKind::Avg,
1648            field,
1649        } => Ok(SqlGlobalAggregateTerminal::AvgField(field)),
1650        LoweredSqlAggregateShape::FieldTarget {
1651            kind: SqlAggregateKind::Min,
1652            field,
1653        } => Ok(SqlGlobalAggregateTerminal::MinField(field)),
1654        LoweredSqlAggregateShape::FieldTarget {
1655            kind: SqlAggregateKind::Max,
1656            field,
1657        } => Ok(SqlGlobalAggregateTerminal::MaxField(field)),
1658        LoweredSqlAggregateShape::FieldTarget {
1659            kind: SqlAggregateKind::Count,
1660            ..
1661        } => Err(SqlLoweringError::unsupported_select_projection()),
1662    }
1663}
1664
1665fn lower_sql_aggregate_shape(
1666    call: SqlAggregateCall,
1667) -> Result<LoweredSqlAggregateShape, SqlLoweringError> {
1668    match (call.kind, call.field) {
1669        (SqlAggregateKind::Count, None) => Ok(LoweredSqlAggregateShape::CountRows),
1670        (SqlAggregateKind::Count, Some(field)) => Ok(LoweredSqlAggregateShape::CountField(field)),
1671        (
1672            kind @ (SqlAggregateKind::Sum
1673            | SqlAggregateKind::Avg
1674            | SqlAggregateKind::Min
1675            | SqlAggregateKind::Max),
1676            Some(field),
1677        ) => Ok(LoweredSqlAggregateShape::FieldTarget { kind, field }),
1678        _ => Err(SqlLoweringError::unsupported_select_projection()),
1679    }
1680}
1681
1682fn grouped_projection_aggregate_calls(
1683    projection: &SqlProjection,
1684    group_by_fields: &[String],
1685) -> Result<Vec<SqlAggregateCall>, SqlLoweringError> {
1686    if group_by_fields.is_empty() {
1687        return Err(SqlLoweringError::unsupported_select_group_by());
1688    }
1689
1690    let SqlProjection::Items(items) = projection else {
1691        return Err(SqlLoweringError::unsupported_select_group_by());
1692    };
1693
1694    let mut projected_group_fields = Vec::<String>::new();
1695    let mut aggregate_calls = Vec::<SqlAggregateCall>::new();
1696    let mut seen_aggregate = false;
1697
1698    for item in items {
1699        match item {
1700            SqlSelectItem::Field(field) => {
1701                // Keep grouped projection deterministic and mappable to grouped
1702                // response contracts: group keys must be declared first.
1703                if seen_aggregate {
1704                    return Err(SqlLoweringError::unsupported_select_group_by());
1705                }
1706                projected_group_fields.push(field.clone());
1707            }
1708            SqlSelectItem::Aggregate(aggregate) => {
1709                seen_aggregate = true;
1710                aggregate_calls.push(aggregate.clone());
1711            }
1712            SqlSelectItem::TextFunction(_) => {
1713                return Err(SqlLoweringError::unsupported_select_group_by());
1714            }
1715        }
1716    }
1717
1718    if aggregate_calls.is_empty() || projected_group_fields.as_slice() != group_by_fields {
1719        return Err(SqlLoweringError::unsupported_select_group_by());
1720    }
1721
1722    Ok(aggregate_calls)
1723}
1724
1725fn lower_aggregate_call(
1726    call: SqlAggregateCall,
1727) -> Result<crate::db::query::builder::AggregateExpr, SqlLoweringError> {
1728    match lower_sql_aggregate_shape(call)? {
1729        LoweredSqlAggregateShape::CountRows => Ok(count()),
1730        LoweredSqlAggregateShape::CountField(field) => Ok(count_by(field)),
1731        LoweredSqlAggregateShape::FieldTarget {
1732            kind: SqlAggregateKind::Sum,
1733            field,
1734        } => Ok(sum(field)),
1735        LoweredSqlAggregateShape::FieldTarget {
1736            kind: SqlAggregateKind::Avg,
1737            field,
1738        } => Ok(avg(field)),
1739        LoweredSqlAggregateShape::FieldTarget {
1740            kind: SqlAggregateKind::Min,
1741            field,
1742        } => Ok(min_by(field)),
1743        LoweredSqlAggregateShape::FieldTarget {
1744            kind: SqlAggregateKind::Max,
1745            field,
1746        } => Ok(max_by(field)),
1747        LoweredSqlAggregateShape::FieldTarget {
1748            kind: SqlAggregateKind::Count,
1749            ..
1750        } => Err(SqlLoweringError::unsupported_select_projection()),
1751    }
1752}
1753
1754fn resolve_having_aggregate_index(
1755    target: &SqlAggregateCall,
1756    grouped_projection_aggregates: &[SqlAggregateCall],
1757) -> Result<usize, SqlLoweringError> {
1758    let mut matched = grouped_projection_aggregates
1759        .iter()
1760        .enumerate()
1761        .filter_map(|(index, aggregate)| (aggregate == target).then_some(index));
1762    let Some(index) = matched.next() else {
1763        return Err(SqlLoweringError::unsupported_select_having());
1764    };
1765    if matched.next().is_some() {
1766        return Err(SqlLoweringError::unsupported_select_having());
1767    }
1768
1769    Ok(index)
1770}
1771
1772fn lower_delete_shape(statement: SqlDeleteStatement) -> LoweredBaseQueryShape {
1773    let SqlDeleteStatement {
1774        predicate,
1775        order_by,
1776        limit,
1777        entity: _,
1778    } = statement;
1779
1780    LoweredBaseQueryShape {
1781        predicate,
1782        order_by,
1783        limit,
1784        offset: None,
1785    }
1786}
1787
1788fn apply_order_terms_structural(
1789    mut query: StructuralQuery,
1790    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1791) -> StructuralQuery {
1792    for term in order_by {
1793        query = match term.direction {
1794            SqlOrderDirection::Asc => query.order_by(term.field),
1795            SqlOrderDirection::Desc => query.order_by_desc(term.field),
1796        };
1797    }
1798
1799    query
1800}
1801
1802fn normalize_having_clauses(
1803    clauses: Vec<SqlHavingClause>,
1804    entity_scope: &[String],
1805) -> Vec<SqlHavingClause> {
1806    clauses
1807        .into_iter()
1808        .map(|clause| SqlHavingClause {
1809            symbol: normalize_having_symbol(clause.symbol, entity_scope),
1810            op: clause.op,
1811            value: clause.value,
1812        })
1813        .collect()
1814}
1815
1816fn normalize_having_symbol(symbol: SqlHavingSymbol, entity_scope: &[String]) -> SqlHavingSymbol {
1817    match symbol {
1818        SqlHavingSymbol::Field(field) => {
1819            SqlHavingSymbol::Field(normalize_identifier_to_scope(field, entity_scope))
1820        }
1821        SqlHavingSymbol::Aggregate(aggregate) => SqlHavingSymbol::Aggregate(
1822            normalize_aggregate_call_identifiers(aggregate, entity_scope),
1823        ),
1824    }
1825}
1826
1827fn normalize_aggregate_call_identifiers(
1828    aggregate: SqlAggregateCall,
1829    entity_scope: &[String],
1830) -> SqlAggregateCall {
1831    SqlAggregateCall {
1832        kind: aggregate.kind,
1833        field: aggregate
1834            .field
1835            .map(|field| normalize_identifier_to_scope(field, entity_scope)),
1836    }
1837}
1838
1839// Build one identifier scope used for reducing SQL-qualified field references
1840// (`entity.field`, `schema.entity.field`) into canonical planner field names.
1841fn sql_entity_scope_candidates(sql_entity: &str, expected_entity: &'static str) -> Vec<String> {
1842    let mut out = Vec::new();
1843    out.push(sql_entity.to_string());
1844    out.push(expected_entity.to_string());
1845
1846    if let Some(last) = identifier_last_segment(sql_entity) {
1847        out.push(last.to_string());
1848    }
1849    if let Some(last) = identifier_last_segment(expected_entity) {
1850        out.push(last.to_string());
1851    }
1852
1853    out
1854}
1855
1856fn normalize_projection_identifiers(
1857    projection: SqlProjection,
1858    entity_scope: &[String],
1859) -> SqlProjection {
1860    match projection {
1861        SqlProjection::All => SqlProjection::All,
1862        SqlProjection::Items(items) => SqlProjection::Items(
1863            items
1864                .into_iter()
1865                .map(|item| match item {
1866                    SqlSelectItem::Field(field) => {
1867                        SqlSelectItem::Field(normalize_identifier(field, entity_scope))
1868                    }
1869                    SqlSelectItem::Aggregate(aggregate) => {
1870                        SqlSelectItem::Aggregate(SqlAggregateCall {
1871                            kind: aggregate.kind,
1872                            field: aggregate
1873                                .field
1874                                .map(|field| normalize_identifier(field, entity_scope)),
1875                        })
1876                    }
1877                    SqlSelectItem::TextFunction(SqlTextFunctionCall {
1878                        function,
1879                        field,
1880                        literal,
1881                        literal2,
1882                        literal3,
1883                    }) => SqlSelectItem::TextFunction(SqlTextFunctionCall {
1884                        function,
1885                        field: normalize_identifier(field, entity_scope),
1886                        literal,
1887                        literal2,
1888                        literal3,
1889                    }),
1890                })
1891                .collect(),
1892        ),
1893    }
1894}
1895
1896fn normalize_order_terms(
1897    terms: Vec<crate::db::sql::parser::SqlOrderTerm>,
1898    entity_scope: &[String],
1899) -> Vec<crate::db::sql::parser::SqlOrderTerm> {
1900    terms
1901        .into_iter()
1902        .map(|term| crate::db::sql::parser::SqlOrderTerm {
1903            field: normalize_order_term_identifier(term.field, entity_scope),
1904            direction: term.direction,
1905        })
1906        .collect()
1907}
1908
1909fn normalize_order_term_identifier(identifier: String, entity_scope: &[String]) -> String {
1910    let Some(expression) = ExpressionOrderTerm::parse(identifier.as_str()) else {
1911        return normalize_identifier(identifier, entity_scope);
1912    };
1913    let normalized_field = normalize_identifier(expression.field().to_string(), entity_scope);
1914
1915    expression.canonical_text_with_field(normalized_field.as_str())
1916}
1917
1918fn normalize_identifier_list(fields: Vec<String>, entity_scope: &[String]) -> Vec<String> {
1919    fields
1920        .into_iter()
1921        .map(|field| normalize_identifier(field, entity_scope))
1922        .collect()
1923}
1924
1925// SQL lowering only adapts identifier qualification (`entity.field` -> `field`)
1926// and delegates predicate-tree traversal ownership to `db::predicate`.
1927fn adapt_predicate_identifiers_to_scope(
1928    predicate: Predicate,
1929    entity_scope: &[String],
1930) -> Predicate {
1931    rewrite_field_identifiers(predicate, |field| normalize_identifier(field, entity_scope))
1932}
1933
1934fn normalize_identifier(identifier: String, entity_scope: &[String]) -> String {
1935    normalize_identifier_to_scope(identifier, entity_scope)
1936}
1937
1938fn ensure_entity_matches_expected(
1939    sql_entity: &str,
1940    expected_entity: &'static str,
1941) -> Result<(), SqlLoweringError> {
1942    if identifiers_tail_match(sql_entity, expected_entity) {
1943        return Ok(());
1944    }
1945
1946    Err(SqlLoweringError::entity_mismatch(
1947        sql_entity,
1948        expected_entity,
1949    ))
1950}