Skip to main content

icydb_core/db/sql/lowering/
mod.rs

1//! Module: db::sql::lowering
2//! Responsibility: reduced SQL statement lowering into canonical query intent.
3//! Does not own: SQL tokenization/parsing, planner validation policy, or executor semantics.
4//! Boundary: frontend-only translation from parsed SQL statement contracts to `Query<E>`.
5
6///
7/// TESTS
8///
9
10#[cfg(test)]
11mod tests;
12
13use crate::{
14    db::{
15        predicate::{CoercionId, CompareOp, MissingRowPolicy, Predicate},
16        query::{
17            builder::aggregate::{avg, count, count_by, max_by, min_by, sum},
18            intent::{Query, QueryError, StructuralQuery},
19            plan::{
20                AggregateKind, ExpressionOrderTerm, FieldSlot, resolve_aggregate_target_field_slot,
21            },
22        },
23        sql::identifier::{
24            identifier_last_segment, identifiers_tail_match, normalize_identifier_to_scope,
25            rewrite_field_identifiers,
26        },
27        sql::parser::{
28            SqlAggregateCall, SqlAggregateKind, SqlDeleteStatement, SqlExplainMode,
29            SqlExplainStatement, SqlExplainTarget, SqlHavingClause, SqlHavingSymbol,
30            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
31            SqlStatement, SqlTextFunctionCall,
32        },
33    },
34    model::{entity::EntityModel, field::FieldKind},
35    traits::EntityKind,
36    value::Value,
37};
38use thiserror::Error as ThisError;
39
40///
41/// LoweredSqlCommand
42///
43/// Generic-free SQL command shape after reduced SQL parsing and entity-route
44/// normalization.
45/// This keeps statement-shape lowering shared across entities before typed
46/// `Query<E>` binding happens at the execution boundary.
47///
48#[derive(Clone, Debug)]
49pub struct LoweredSqlCommand(LoweredSqlCommandInner);
50
51#[derive(Clone, Debug)]
52enum LoweredSqlCommandInner {
53    Query(LoweredSqlQuery),
54    Explain {
55        mode: SqlExplainMode,
56        query: LoweredSqlQuery,
57    },
58    ExplainGlobalAggregate {
59        mode: SqlExplainMode,
60        command: LoweredSqlGlobalAggregateCommand,
61    },
62    DescribeEntity,
63    ShowIndexesEntity,
64    ShowColumnsEntity,
65    ShowEntities,
66}
67
68///
69/// SqlCommand
70///
71/// Test-only typed SQL command shell over the shared lowered SQL surface.
72/// Runtime dispatch now consumes `LoweredSqlCommand` directly, but lowering
73/// tests still validate typed binding behavior on this local envelope.
74///
75#[cfg(test)]
76#[derive(Debug)]
77pub(crate) enum SqlCommand<E: EntityKind> {
78    Query(Query<E>),
79    Explain {
80        mode: SqlExplainMode,
81        query: Query<E>,
82    },
83    ExplainGlobalAggregate {
84        mode: SqlExplainMode,
85        command: SqlGlobalAggregateCommand<E>,
86    },
87    DescribeEntity,
88    ShowIndexesEntity,
89    ShowColumnsEntity,
90    ShowEntities,
91}
92
93impl LoweredSqlCommand {
94    #[must_use]
95    pub(in crate::db) const fn query(&self) -> Option<&LoweredSqlQuery> {
96        match &self.0 {
97            LoweredSqlCommandInner::Query(query) => Some(query),
98            LoweredSqlCommandInner::Explain { .. }
99            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
100            | LoweredSqlCommandInner::DescribeEntity
101            | LoweredSqlCommandInner::ShowIndexesEntity
102            | LoweredSqlCommandInner::ShowColumnsEntity
103            | LoweredSqlCommandInner::ShowEntities => None,
104        }
105    }
106
107    #[must_use]
108    pub(in crate::db) fn into_query(self) -> Option<LoweredSqlQuery> {
109        match self.0 {
110            LoweredSqlCommandInner::Query(query) => Some(query),
111            LoweredSqlCommandInner::Explain { .. }
112            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
113            | LoweredSqlCommandInner::DescribeEntity
114            | LoweredSqlCommandInner::ShowIndexesEntity
115            | LoweredSqlCommandInner::ShowColumnsEntity
116            | LoweredSqlCommandInner::ShowEntities => None,
117        }
118    }
119
120    #[must_use]
121    pub(in crate::db) const fn explain_query(&self) -> Option<(SqlExplainMode, &LoweredSqlQuery)> {
122        match &self.0 {
123            LoweredSqlCommandInner::Explain { mode, query } => Some((*mode, query)),
124            LoweredSqlCommandInner::Query(_)
125            | LoweredSqlCommandInner::ExplainGlobalAggregate { .. }
126            | LoweredSqlCommandInner::DescribeEntity
127            | LoweredSqlCommandInner::ShowIndexesEntity
128            | LoweredSqlCommandInner::ShowColumnsEntity
129            | LoweredSqlCommandInner::ShowEntities => None,
130        }
131    }
132}
133
134///
135/// LoweredSqlQuery
136///
137/// Generic-free executable SQL query shape prepared before typed query binding.
138/// Select and delete lowering stay shared until the final `Query<E>` build.
139///
140#[derive(Clone, Debug)]
141pub(crate) enum LoweredSqlQuery {
142    Select(LoweredSelectShape),
143    Delete(LoweredBaseQueryShape),
144}
145
146impl LoweredSqlQuery {
147    // Report whether this lowered query carries grouped execution semantics.
148    pub(crate) const fn has_grouping(&self) -> bool {
149        match self {
150            Self::Select(select) => select.has_grouping(),
151            Self::Delete(_) => false,
152        }
153    }
154}
155
156///
157/// SqlGlobalAggregateTerminal
158///
159/// Global SQL aggregate terminals currently executable through dedicated
160/// aggregate SQL entrypoints.
161///
162
163#[derive(Clone, Debug, Eq, PartialEq)]
164pub(crate) enum SqlGlobalAggregateTerminal {
165    CountRows,
166    CountField(String),
167    SumField(String),
168    AvgField(String),
169    MinField(String),
170    MaxField(String),
171}
172
173///
174/// TypedSqlGlobalAggregateTerminal
175///
176/// TypedSqlGlobalAggregateTerminal is the typed global aggregate contract used
177/// after entity binding resolves one concrete model.
178/// Field-target variants carry a resolved planner field slot so typed SQL
179/// aggregate execution does not re-resolve the same field name before dispatch.
180///
181#[derive(Clone, Debug, Eq, PartialEq)]
182pub(crate) enum TypedSqlGlobalAggregateTerminal {
183    CountRows,
184    CountField(FieldSlot),
185    SumField(FieldSlot),
186    AvgField(FieldSlot),
187    MinField(FieldSlot),
188    MaxField(FieldSlot),
189}
190
191/// PreparedSqlScalarAggregateDomain
192///
193/// Typed SQL scalar aggregate execution domain selected before session runtime
194/// dispatch. This keeps the typed aggregate lane explicit about which internal
195/// execution family will consume the request.
196#[derive(Clone, Copy, Debug, Eq, PartialEq)]
197pub(crate) enum PreparedSqlScalarAggregateDomain {
198    ExistingRows,
199    ProjectionField,
200    NumericField,
201    ScalarExtremaValue,
202}
203
204/// PreparedSqlScalarAggregateOrderingRequirement
205///
206/// Ordering sensitivity required by the selected typed SQL scalar aggregate
207/// strategy. This keeps first-slice descriptor/explain consumers off local
208/// kind checks when they need to know whether field order semantics matter.
209#[derive(Clone, Copy, Debug, Eq, PartialEq)]
210pub(crate) enum PreparedSqlScalarAggregateOrderingRequirement {
211    None,
212    FieldOrder,
213}
214
215/// PreparedSqlScalarAggregateRowSource
216///
217/// Canonical row-source shape for one prepared typed SQL scalar aggregate
218/// strategy. This describes what kind of row-derived data the execution family
219/// ultimately consumes.
220#[derive(Clone, Copy, Debug, Eq, PartialEq)]
221pub(crate) enum PreparedSqlScalarAggregateRowSource {
222    ExistingRows,
223    ProjectedField,
224    NumericField,
225    ExtremalWinnerField,
226}
227
228/// PreparedSqlScalarAggregateEmptySetBehavior
229///
230/// Canonical empty-window result behavior for one prepared typed SQL scalar
231/// aggregate strategy.
232#[derive(Clone, Copy, Debug, Eq, PartialEq)]
233pub(crate) enum PreparedSqlScalarAggregateEmptySetBehavior {
234    Zero,
235    Null,
236}
237
238/// PreparedSqlScalarAggregateDescriptorShape
239///
240/// Stable typed SQL scalar aggregate descriptor shape derived once at the SQL
241/// aggregate preparation boundary and reused by runtime/EXPLAIN projections.
242#[derive(Clone, Copy, Debug, Eq, PartialEq)]
243pub(crate) enum PreparedSqlScalarAggregateDescriptorShape {
244    CountRows,
245    CountField,
246    SumField,
247    AvgField,
248    MinField,
249    MaxField,
250}
251
252/// PreparedSqlScalarAggregateRuntimeDescriptor
253///
254/// Stable runtime-family projection for one prepared typed SQL scalar
255/// aggregate strategy.
256/// Session SQL aggregate execution consumes this descriptor instead of
257/// rebuilding runtime boundary choice from raw SQL terminal variants or
258/// parallel metadata tuple matches.
259#[derive(Clone, Copy, Debug, Eq, PartialEq)]
260pub(crate) enum PreparedSqlScalarAggregateRuntimeDescriptor {
261    CountRows,
262    CountField,
263    NumericField { kind: AggregateKind },
264    ExtremalWinnerField { kind: AggregateKind },
265}
266
267///
268/// PreparedSqlScalarAggregateStrategy
269///
270/// PreparedSqlScalarAggregateStrategy is the single typed SQL scalar aggregate
271/// behavior source for the first `0.71` slice.
272/// It resolves aggregate domain, descriptor shape, target-slot ownership, and
273/// runtime behavior once so runtime and EXPLAIN do not re-derive that
274/// behavior from raw SQL terminal variants.
275/// Explain-visible aggregate expressions are projected on demand from this
276/// prepared strategy instead of being carried as owned execution metadata.
277///
278#[derive(Clone, Debug, Eq, PartialEq)]
279pub(crate) struct PreparedSqlScalarAggregateStrategy {
280    target_slot: Option<FieldSlot>,
281    domain: PreparedSqlScalarAggregateDomain,
282    ordering_requirement: PreparedSqlScalarAggregateOrderingRequirement,
283    row_source: PreparedSqlScalarAggregateRowSource,
284    empty_set_behavior: PreparedSqlScalarAggregateEmptySetBehavior,
285    descriptor_shape: PreparedSqlScalarAggregateDescriptorShape,
286}
287
288impl PreparedSqlScalarAggregateStrategy {
289    const fn new(
290        target_slot: Option<FieldSlot>,
291        domain: PreparedSqlScalarAggregateDomain,
292        ordering_requirement: PreparedSqlScalarAggregateOrderingRequirement,
293        row_source: PreparedSqlScalarAggregateRowSource,
294        empty_set_behavior: PreparedSqlScalarAggregateEmptySetBehavior,
295        descriptor_shape: PreparedSqlScalarAggregateDescriptorShape,
296    ) -> Self {
297        Self {
298            target_slot,
299            domain,
300            ordering_requirement,
301            row_source,
302            empty_set_behavior,
303            descriptor_shape,
304        }
305    }
306
307    fn from_typed_terminal(terminal: &TypedSqlGlobalAggregateTerminal) -> Self {
308        match terminal {
309            TypedSqlGlobalAggregateTerminal::CountRows => Self::new(
310                None,
311                PreparedSqlScalarAggregateDomain::ExistingRows,
312                PreparedSqlScalarAggregateOrderingRequirement::None,
313                PreparedSqlScalarAggregateRowSource::ExistingRows,
314                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
315                PreparedSqlScalarAggregateDescriptorShape::CountRows,
316            ),
317            TypedSqlGlobalAggregateTerminal::CountField(target_slot) => Self::new(
318                Some(target_slot.clone()),
319                PreparedSqlScalarAggregateDomain::ProjectionField,
320                PreparedSqlScalarAggregateOrderingRequirement::None,
321                PreparedSqlScalarAggregateRowSource::ProjectedField,
322                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
323                PreparedSqlScalarAggregateDescriptorShape::CountField,
324            ),
325            TypedSqlGlobalAggregateTerminal::SumField(target_slot) => Self::new(
326                Some(target_slot.clone()),
327                PreparedSqlScalarAggregateDomain::NumericField,
328                PreparedSqlScalarAggregateOrderingRequirement::None,
329                PreparedSqlScalarAggregateRowSource::NumericField,
330                PreparedSqlScalarAggregateEmptySetBehavior::Null,
331                PreparedSqlScalarAggregateDescriptorShape::SumField,
332            ),
333            TypedSqlGlobalAggregateTerminal::AvgField(target_slot) => Self::new(
334                Some(target_slot.clone()),
335                PreparedSqlScalarAggregateDomain::NumericField,
336                PreparedSqlScalarAggregateOrderingRequirement::None,
337                PreparedSqlScalarAggregateRowSource::NumericField,
338                PreparedSqlScalarAggregateEmptySetBehavior::Null,
339                PreparedSqlScalarAggregateDescriptorShape::AvgField,
340            ),
341            TypedSqlGlobalAggregateTerminal::MinField(target_slot) => Self::new(
342                Some(target_slot.clone()),
343                PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
344                PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
345                PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
346                PreparedSqlScalarAggregateEmptySetBehavior::Null,
347                PreparedSqlScalarAggregateDescriptorShape::MinField,
348            ),
349            TypedSqlGlobalAggregateTerminal::MaxField(target_slot) => Self::new(
350                Some(target_slot.clone()),
351                PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
352                PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
353                PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
354                PreparedSqlScalarAggregateEmptySetBehavior::Null,
355                PreparedSqlScalarAggregateDescriptorShape::MaxField,
356            ),
357        }
358    }
359
360    fn from_lowered_terminal_with_model(
361        model: &'static EntityModel,
362        terminal: &SqlGlobalAggregateTerminal,
363    ) -> Result<Self, SqlLoweringError> {
364        let resolve_target_slot = |field: &str| {
365            resolve_aggregate_target_field_slot(model, field).map_err(SqlLoweringError::from)
366        };
367
368        match terminal {
369            SqlGlobalAggregateTerminal::CountRows => Ok(Self::new(
370                None,
371                PreparedSqlScalarAggregateDomain::ExistingRows,
372                PreparedSqlScalarAggregateOrderingRequirement::None,
373                PreparedSqlScalarAggregateRowSource::ExistingRows,
374                PreparedSqlScalarAggregateEmptySetBehavior::Zero,
375                PreparedSqlScalarAggregateDescriptorShape::CountRows,
376            )),
377            SqlGlobalAggregateTerminal::CountField(field) => {
378                let target_slot = resolve_target_slot(field.as_str())?;
379
380                Ok(Self::new(
381                    Some(target_slot),
382                    PreparedSqlScalarAggregateDomain::ProjectionField,
383                    PreparedSqlScalarAggregateOrderingRequirement::None,
384                    PreparedSqlScalarAggregateRowSource::ProjectedField,
385                    PreparedSqlScalarAggregateEmptySetBehavior::Zero,
386                    PreparedSqlScalarAggregateDescriptorShape::CountField,
387                ))
388            }
389            SqlGlobalAggregateTerminal::SumField(field) => {
390                let target_slot = resolve_target_slot(field.as_str())?;
391
392                Ok(Self::new(
393                    Some(target_slot),
394                    PreparedSqlScalarAggregateDomain::NumericField,
395                    PreparedSqlScalarAggregateOrderingRequirement::None,
396                    PreparedSqlScalarAggregateRowSource::NumericField,
397                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
398                    PreparedSqlScalarAggregateDescriptorShape::SumField,
399                ))
400            }
401            SqlGlobalAggregateTerminal::AvgField(field) => {
402                let target_slot = resolve_target_slot(field.as_str())?;
403
404                Ok(Self::new(
405                    Some(target_slot),
406                    PreparedSqlScalarAggregateDomain::NumericField,
407                    PreparedSqlScalarAggregateOrderingRequirement::None,
408                    PreparedSqlScalarAggregateRowSource::NumericField,
409                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
410                    PreparedSqlScalarAggregateDescriptorShape::AvgField,
411                ))
412            }
413            SqlGlobalAggregateTerminal::MinField(field) => {
414                let target_slot = resolve_target_slot(field.as_str())?;
415
416                Ok(Self::new(
417                    Some(target_slot),
418                    PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
419                    PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
420                    PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
421                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
422                    PreparedSqlScalarAggregateDescriptorShape::MinField,
423                ))
424            }
425            SqlGlobalAggregateTerminal::MaxField(field) => {
426                let target_slot = resolve_target_slot(field.as_str())?;
427
428                Ok(Self::new(
429                    Some(target_slot),
430                    PreparedSqlScalarAggregateDomain::ScalarExtremaValue,
431                    PreparedSqlScalarAggregateOrderingRequirement::FieldOrder,
432                    PreparedSqlScalarAggregateRowSource::ExtremalWinnerField,
433                    PreparedSqlScalarAggregateEmptySetBehavior::Null,
434                    PreparedSqlScalarAggregateDescriptorShape::MaxField,
435                ))
436            }
437        }
438    }
439
440    /// Borrow the resolved target slot when this prepared SQL scalar strategy is field-targeted.
441    #[must_use]
442    pub(crate) const fn target_slot(&self) -> Option<&FieldSlot> {
443        self.target_slot.as_ref()
444    }
445
446    /// Return the canonical typed SQL scalar aggregate domain.
447    #[cfg(test)]
448    #[must_use]
449    pub(crate) const fn domain(&self) -> PreparedSqlScalarAggregateDomain {
450        self.domain
451    }
452
453    /// Return the stable descriptor/runtime shape label for this prepared strategy.
454    #[cfg(test)]
455    #[must_use]
456    pub(crate) const fn descriptor_shape(&self) -> PreparedSqlScalarAggregateDescriptorShape {
457        self.descriptor_shape
458    }
459
460    /// Return the stable runtime-family projection for this prepared SQL
461    /// scalar aggregate strategy.
462    #[must_use]
463    pub(crate) const fn runtime_descriptor(&self) -> PreparedSqlScalarAggregateRuntimeDescriptor {
464        match self.descriptor_shape {
465            PreparedSqlScalarAggregateDescriptorShape::CountRows => {
466                PreparedSqlScalarAggregateRuntimeDescriptor::CountRows
467            }
468            PreparedSqlScalarAggregateDescriptorShape::CountField => {
469                PreparedSqlScalarAggregateRuntimeDescriptor::CountField
470            }
471            PreparedSqlScalarAggregateDescriptorShape::SumField => {
472                PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
473                    kind: AggregateKind::Sum,
474                }
475            }
476            PreparedSqlScalarAggregateDescriptorShape::AvgField => {
477                PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
478                    kind: AggregateKind::Avg,
479                }
480            }
481            PreparedSqlScalarAggregateDescriptorShape::MinField => {
482                PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
483                    kind: AggregateKind::Min,
484                }
485            }
486            PreparedSqlScalarAggregateDescriptorShape::MaxField => {
487                PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField {
488                    kind: AggregateKind::Max,
489                }
490            }
491        }
492    }
493
494    /// Return the canonical aggregate kind for this prepared SQL scalar strategy.
495    #[must_use]
496    pub(crate) const fn aggregate_kind(&self) -> AggregateKind {
497        match self.descriptor_shape {
498            PreparedSqlScalarAggregateDescriptorShape::CountRows
499            | PreparedSqlScalarAggregateDescriptorShape::CountField => AggregateKind::Count,
500            PreparedSqlScalarAggregateDescriptorShape::SumField => AggregateKind::Sum,
501            PreparedSqlScalarAggregateDescriptorShape::AvgField => AggregateKind::Avg,
502            PreparedSqlScalarAggregateDescriptorShape::MinField => AggregateKind::Min,
503            PreparedSqlScalarAggregateDescriptorShape::MaxField => AggregateKind::Max,
504        }
505    }
506
507    /// Return the projected field label for descriptor/explain projection when
508    /// this prepared strategy is field-targeted.
509    #[must_use]
510    pub(crate) fn projected_field(&self) -> Option<&str> {
511        self.target_slot().map(FieldSlot::field)
512    }
513
514    /// Return field-order sensitivity for this prepared SQL scalar aggregate strategy.
515    #[cfg(test)]
516    #[must_use]
517    pub(crate) const fn ordering_requirement(
518        &self,
519    ) -> PreparedSqlScalarAggregateOrderingRequirement {
520        self.ordering_requirement
521    }
522
523    /// Return the canonical row-source shape for this prepared strategy.
524    #[cfg(test)]
525    #[must_use]
526    pub(crate) const fn row_source(&self) -> PreparedSqlScalarAggregateRowSource {
527        self.row_source
528    }
529
530    /// Return empty-window behavior for this prepared SQL scalar aggregate strategy.
531    #[cfg(test)]
532    #[must_use]
533    pub(crate) const fn empty_set_behavior(&self) -> PreparedSqlScalarAggregateEmptySetBehavior {
534        self.empty_set_behavior
535    }
536}
537
538///
539/// LoweredSqlGlobalAggregateCommand
540///
541/// Generic-free global aggregate command shape prepared before typed query
542/// binding.
543/// This keeps aggregate SQL lowering shared across entities until the final
544/// execution boundary converts the base query shape into `Query<E>`.
545///
546#[derive(Clone, Debug)]
547pub(crate) struct LoweredSqlGlobalAggregateCommand {
548    query: LoweredBaseQueryShape,
549    terminal: SqlGlobalAggregateTerminal,
550}
551
552///
553/// LoweredSqlAggregateShape
554///
555/// Locally validated aggregate-call shape used by SQL lowering to avoid
556/// duplicating `(SqlAggregateKind, field)` validation across lowering lanes.
557///
558
559enum LoweredSqlAggregateShape {
560    CountRows,
561    CountField(String),
562    FieldTarget {
563        kind: SqlAggregateKind,
564        field: String,
565    },
566}
567
568///
569/// SqlGlobalAggregateCommand
570///
571/// Lowered global SQL aggregate command carrying base query shape plus terminal.
572///
573
574#[derive(Debug)]
575pub(crate) struct SqlGlobalAggregateCommand<E: EntityKind> {
576    query: Query<E>,
577    terminal: TypedSqlGlobalAggregateTerminal,
578}
579
580impl<E: EntityKind> SqlGlobalAggregateCommand<E> {
581    /// Borrow the lowered base query shape for aggregate execution.
582    #[must_use]
583    pub(crate) const fn query(&self) -> &Query<E> {
584        &self.query
585    }
586
587    /// Borrow the lowered aggregate terminal.
588    #[cfg(test)]
589    #[must_use]
590    pub(crate) const fn terminal(&self) -> &TypedSqlGlobalAggregateTerminal {
591        &self.terminal
592    }
593
594    /// Prepare one typed SQL scalar aggregate strategy from the typed command boundary.
595    #[must_use]
596    pub(crate) fn prepared_scalar_strategy(&self) -> PreparedSqlScalarAggregateStrategy {
597        PreparedSqlScalarAggregateStrategy::from_typed_terminal(&self.terminal)
598    }
599}
600
601///
602/// SqlGlobalAggregateCommandCore
603///
604/// Generic-free lowered global aggregate command bound onto the structural
605/// query surface.
606/// This keeps global aggregate EXPLAIN on the shared query/explain path until
607/// a typed boundary is strictly required.
608///
609
610#[derive(Debug)]
611pub(crate) struct SqlGlobalAggregateCommandCore {
612    query: StructuralQuery,
613    terminal: SqlGlobalAggregateTerminal,
614}
615
616impl SqlGlobalAggregateCommandCore {
617    /// Borrow the structural query payload for aggregate explain/execution.
618    #[must_use]
619    pub(in crate::db) const fn query(&self) -> &StructuralQuery {
620        &self.query
621    }
622
623    /// Prepare one structural SQL scalar aggregate strategy using one concrete model.
624    pub(in crate::db) fn prepared_scalar_strategy_with_model(
625        &self,
626        model: &'static EntityModel,
627    ) -> Result<PreparedSqlScalarAggregateStrategy, SqlLoweringError> {
628        PreparedSqlScalarAggregateStrategy::from_lowered_terminal_with_model(model, &self.terminal)
629    }
630}
631
632///
633/// SqlLoweringError
634///
635/// SQL frontend lowering failures before planner validation/execution.
636///
637
638#[derive(Debug, ThisError)]
639pub(crate) enum SqlLoweringError {
640    #[error("{0}")]
641    Parse(#[from] crate::db::sql::parser::SqlParseError),
642
643    #[error("{0}")]
644    Query(Box<QueryError>),
645
646    #[error("SQL entity '{sql_entity}' does not match requested entity type '{expected_entity}'")]
647    EntityMismatch {
648        sql_entity: String,
649        expected_entity: &'static str,
650    },
651
652    #[error(
653        "unsupported SQL SELECT projection; supported forms are SELECT *, field lists, or grouped aggregate shapes"
654    )]
655    UnsupportedSelectProjection,
656
657    #[error("unsupported SQL SELECT DISTINCT")]
658    UnsupportedSelectDistinct,
659
660    #[error("unsupported SQL GROUP BY projection shape")]
661    UnsupportedSelectGroupBy,
662
663    #[error("unsupported SQL HAVING shape")]
664    UnsupportedSelectHaving,
665}
666
667impl SqlLoweringError {
668    /// Construct one entity-mismatch SQL lowering error.
669    fn entity_mismatch(sql_entity: impl Into<String>, expected_entity: &'static str) -> Self {
670        Self::EntityMismatch {
671            sql_entity: sql_entity.into(),
672            expected_entity,
673        }
674    }
675
676    /// Construct one unsupported SELECT projection SQL lowering error.
677    const fn unsupported_select_projection() -> Self {
678        Self::UnsupportedSelectProjection
679    }
680
681    /// Construct one unsupported SELECT DISTINCT SQL lowering error.
682    const fn unsupported_select_distinct() -> Self {
683        Self::UnsupportedSelectDistinct
684    }
685
686    /// Construct one unsupported SELECT GROUP BY shape SQL lowering error.
687    const fn unsupported_select_group_by() -> Self {
688        Self::UnsupportedSelectGroupBy
689    }
690
691    /// Construct one unsupported SELECT HAVING shape SQL lowering error.
692    const fn unsupported_select_having() -> Self {
693        Self::UnsupportedSelectHaving
694    }
695}
696
697impl From<QueryError> for SqlLoweringError {
698    fn from(value: QueryError) -> Self {
699        Self::Query(Box::new(value))
700    }
701}
702
703///
704/// PreparedSqlStatement
705///
706/// SQL statement envelope after entity-scope normalization and
707/// entity-match validation for one target entity descriptor.
708///
709/// This pre-lowering contract is entity-agnostic and reusable across
710/// dynamic SQL route branches before typed `Query<E>` binding.
711///
712
713#[derive(Clone, Debug)]
714pub(crate) struct PreparedSqlStatement {
715    statement: SqlStatement,
716}
717
718#[derive(Clone, Copy, Debug, Eq, PartialEq)]
719pub(crate) enum LoweredSqlLaneKind {
720    Query,
721    Explain,
722    Describe,
723    ShowIndexes,
724    ShowColumns,
725    ShowEntities,
726}
727
728/// Parse and lower one SQL statement into canonical query intent for `E`.
729#[cfg(test)]
730pub(crate) fn compile_sql_command<E: EntityKind>(
731    sql: &str,
732    consistency: MissingRowPolicy,
733) -> Result<SqlCommand<E>, SqlLoweringError> {
734    let statement = crate::db::sql::parser::parse_sql(sql)?;
735    compile_sql_command_from_statement::<E>(statement, consistency)
736}
737
738/// Lower one parsed SQL statement into canonical query intent for `E`.
739#[cfg(test)]
740pub(crate) fn compile_sql_command_from_statement<E: EntityKind>(
741    statement: SqlStatement,
742    consistency: MissingRowPolicy,
743) -> Result<SqlCommand<E>, SqlLoweringError> {
744    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
745    compile_sql_command_from_prepared_statement::<E>(prepared, consistency)
746}
747
748/// Lower one prepared SQL statement into canonical query intent for `E`.
749#[cfg(test)]
750pub(crate) fn compile_sql_command_from_prepared_statement<E: EntityKind>(
751    prepared: PreparedSqlStatement,
752    consistency: MissingRowPolicy,
753) -> Result<SqlCommand<E>, SqlLoweringError> {
754    let lowered = lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)?;
755
756    bind_lowered_sql_command::<E>(lowered, consistency)
757}
758
759/// Lower one prepared SQL statement into one shared generic-free command shape.
760#[inline(never)]
761pub(crate) fn lower_sql_command_from_prepared_statement(
762    prepared: PreparedSqlStatement,
763    primary_key_field: &str,
764) -> Result<LoweredSqlCommand, SqlLoweringError> {
765    lower_prepared_statement(prepared.statement, primary_key_field)
766}
767
768pub(crate) const fn lowered_sql_command_lane(command: &LoweredSqlCommand) -> LoweredSqlLaneKind {
769    match command.0 {
770        LoweredSqlCommandInner::Query(_) => LoweredSqlLaneKind::Query,
771        LoweredSqlCommandInner::Explain { .. }
772        | LoweredSqlCommandInner::ExplainGlobalAggregate { .. } => LoweredSqlLaneKind::Explain,
773        LoweredSqlCommandInner::DescribeEntity => LoweredSqlLaneKind::Describe,
774        LoweredSqlCommandInner::ShowIndexesEntity => LoweredSqlLaneKind::ShowIndexes,
775        LoweredSqlCommandInner::ShowColumnsEntity => LoweredSqlLaneKind::ShowColumns,
776        LoweredSqlCommandInner::ShowEntities => LoweredSqlLaneKind::ShowEntities,
777    }
778}
779
780/// Return whether one parsed SQL statement is an executable constrained global
781/// aggregate shape owned by the dedicated aggregate lane.
782pub(in crate::db) fn is_sql_global_aggregate_statement(statement: &SqlStatement) -> bool {
783    let SqlStatement::Select(statement) = statement else {
784        return false;
785    };
786
787    is_sql_global_aggregate_select(statement)
788}
789
790// Detect one constrained global aggregate select shape without widening any
791// non-aggregate SQL surface onto the dedicated aggregate execution lane.
792fn is_sql_global_aggregate_select(statement: &SqlSelectStatement) -> bool {
793    if statement.distinct || !statement.group_by.is_empty() || !statement.having.is_empty() {
794        return false;
795    }
796
797    lower_global_aggregate_terminal(statement.projection.clone()).is_ok()
798}
799
800/// Bind one lowered global aggregate EXPLAIN shape onto the structural query
801/// surface when the explain command carries that specialized form.
802pub(crate) fn bind_lowered_sql_explain_global_aggregate_structural(
803    lowered: &LoweredSqlCommand,
804    model: &'static crate::model::entity::EntityModel,
805    consistency: MissingRowPolicy,
806) -> Option<(SqlExplainMode, SqlGlobalAggregateCommandCore)> {
807    let LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } = &lowered.0 else {
808        return None;
809    };
810
811    Some((
812        *mode,
813        bind_lowered_sql_global_aggregate_command_structural(model, command.clone(), consistency),
814    ))
815}
816
817/// Bind one shared generic-free SQL command shape to the typed query surface.
818#[cfg(test)]
819pub(crate) fn bind_lowered_sql_command<E: EntityKind>(
820    lowered: LoweredSqlCommand,
821    consistency: MissingRowPolicy,
822) -> Result<SqlCommand<E>, SqlLoweringError> {
823    match lowered.0 {
824        LoweredSqlCommandInner::Query(query) => Ok(SqlCommand::Query(bind_lowered_sql_query::<E>(
825            query,
826            consistency,
827        )?)),
828        LoweredSqlCommandInner::Explain { mode, query } => Ok(SqlCommand::Explain {
829            mode,
830            query: bind_lowered_sql_query::<E>(query, consistency)?,
831        }),
832        LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command } => {
833            Ok(SqlCommand::ExplainGlobalAggregate {
834                mode,
835                command: bind_lowered_sql_global_aggregate_command::<E>(command, consistency)?,
836            })
837        }
838        LoweredSqlCommandInner::DescribeEntity => Ok(SqlCommand::DescribeEntity),
839        LoweredSqlCommandInner::ShowIndexesEntity => Ok(SqlCommand::ShowIndexesEntity),
840        LoweredSqlCommandInner::ShowColumnsEntity => Ok(SqlCommand::ShowColumnsEntity),
841        LoweredSqlCommandInner::ShowEntities => Ok(SqlCommand::ShowEntities),
842    }
843}
844
845/// Prepare one parsed SQL statement for one expected entity route.
846#[inline(never)]
847pub(crate) fn prepare_sql_statement(
848    statement: SqlStatement,
849    expected_entity: &'static str,
850) -> Result<PreparedSqlStatement, SqlLoweringError> {
851    let statement = prepare_statement(statement, expected_entity)?;
852
853    Ok(PreparedSqlStatement { statement })
854}
855
856/// Parse and lower one SQL statement into global aggregate execution command for `E`.
857#[cfg(test)]
858pub(crate) fn compile_sql_global_aggregate_command<E: EntityKind>(
859    sql: &str,
860    consistency: MissingRowPolicy,
861) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
862    let statement = crate::db::sql::parser::parse_sql(sql)?;
863    let prepared = prepare_sql_statement(statement, E::MODEL.name())?;
864    compile_sql_global_aggregate_command_from_prepared::<E>(prepared, consistency)
865}
866
867// Lower one already-prepared SQL statement into the constrained global
868// aggregate command envelope so callers that already parsed and routed the
869// statement do not pay the parser again.
870pub(crate) fn compile_sql_global_aggregate_command_from_prepared<E: EntityKind>(
871    prepared: PreparedSqlStatement,
872    consistency: MissingRowPolicy,
873) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
874    let SqlStatement::Select(statement) = prepared.statement else {
875        return Err(SqlLoweringError::unsupported_select_projection());
876    };
877
878    bind_lowered_sql_global_aggregate_command::<E>(
879        lower_global_aggregate_select_shape(statement)?,
880        consistency,
881    )
882}
883
884fn bind_lowered_sql_global_aggregate_terminal<E: EntityKind>(
885    terminal: SqlGlobalAggregateTerminal,
886) -> Result<TypedSqlGlobalAggregateTerminal, SqlLoweringError> {
887    let resolve_target_slot = |field: &str| {
888        resolve_aggregate_target_field_slot(E::MODEL, field).map_err(SqlLoweringError::from)
889    };
890
891    match terminal {
892        SqlGlobalAggregateTerminal::CountRows => Ok(TypedSqlGlobalAggregateTerminal::CountRows),
893        SqlGlobalAggregateTerminal::CountField(field) => Ok(
894            TypedSqlGlobalAggregateTerminal::CountField(resolve_target_slot(field.as_str())?),
895        ),
896        SqlGlobalAggregateTerminal::SumField(field) => Ok(
897            TypedSqlGlobalAggregateTerminal::SumField(resolve_target_slot(field.as_str())?),
898        ),
899        SqlGlobalAggregateTerminal::AvgField(field) => Ok(
900            TypedSqlGlobalAggregateTerminal::AvgField(resolve_target_slot(field.as_str())?),
901        ),
902        SqlGlobalAggregateTerminal::MinField(field) => Ok(
903            TypedSqlGlobalAggregateTerminal::MinField(resolve_target_slot(field.as_str())?),
904        ),
905        SqlGlobalAggregateTerminal::MaxField(field) => Ok(
906            TypedSqlGlobalAggregateTerminal::MaxField(resolve_target_slot(field.as_str())?),
907        ),
908    }
909}
910
911#[inline(never)]
912fn prepare_statement(
913    statement: SqlStatement,
914    expected_entity: &'static str,
915) -> Result<SqlStatement, SqlLoweringError> {
916    match statement {
917        SqlStatement::Select(statement) => Ok(SqlStatement::Select(prepare_select_statement(
918            statement,
919            expected_entity,
920        )?)),
921        SqlStatement::Delete(statement) => Ok(SqlStatement::Delete(prepare_delete_statement(
922            statement,
923            expected_entity,
924        )?)),
925        SqlStatement::Explain(statement) => Ok(SqlStatement::Explain(prepare_explain_statement(
926            statement,
927            expected_entity,
928        )?)),
929        SqlStatement::Describe(statement) => {
930            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
931
932            Ok(SqlStatement::Describe(statement))
933        }
934        SqlStatement::ShowIndexes(statement) => {
935            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
936
937            Ok(SqlStatement::ShowIndexes(statement))
938        }
939        SqlStatement::ShowColumns(statement) => {
940            ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
941
942            Ok(SqlStatement::ShowColumns(statement))
943        }
944        SqlStatement::ShowEntities(statement) => Ok(SqlStatement::ShowEntities(statement)),
945    }
946}
947
948fn prepare_explain_statement(
949    statement: SqlExplainStatement,
950    expected_entity: &'static str,
951) -> Result<SqlExplainStatement, SqlLoweringError> {
952    let target = match statement.statement {
953        SqlExplainTarget::Select(select_statement) => {
954            SqlExplainTarget::Select(prepare_select_statement(select_statement, expected_entity)?)
955        }
956        SqlExplainTarget::Delete(delete_statement) => {
957            SqlExplainTarget::Delete(prepare_delete_statement(delete_statement, expected_entity)?)
958        }
959    };
960
961    Ok(SqlExplainStatement {
962        mode: statement.mode,
963        statement: target,
964    })
965}
966
967fn prepare_select_statement(
968    statement: SqlSelectStatement,
969    expected_entity: &'static str,
970) -> Result<SqlSelectStatement, SqlLoweringError> {
971    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
972
973    Ok(normalize_select_statement_to_expected_entity(
974        statement,
975        expected_entity,
976    ))
977}
978
979fn normalize_select_statement_to_expected_entity(
980    mut statement: SqlSelectStatement,
981    expected_entity: &'static str,
982) -> SqlSelectStatement {
983    // Re-scope parsed identifiers onto the resolved entity surface after the
984    // caller has already established entity ownership for this statement.
985    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
986    statement.projection =
987        normalize_projection_identifiers(statement.projection, entity_scope.as_slice());
988    statement.group_by = normalize_identifier_list(statement.group_by, entity_scope.as_slice());
989    statement.predicate = statement
990        .predicate
991        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
992    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
993    statement.having = normalize_having_clauses(statement.having, entity_scope.as_slice());
994
995    statement
996}
997
998fn prepare_delete_statement(
999    mut statement: SqlDeleteStatement,
1000    expected_entity: &'static str,
1001) -> Result<SqlDeleteStatement, SqlLoweringError> {
1002    ensure_entity_matches_expected(statement.entity.as_str(), expected_entity)?;
1003    let entity_scope = sql_entity_scope_candidates(statement.entity.as_str(), expected_entity);
1004    statement.predicate = statement
1005        .predicate
1006        .map(|predicate| adapt_predicate_identifiers_to_scope(predicate, entity_scope.as_slice()));
1007    statement.order_by = normalize_order_terms(statement.order_by, entity_scope.as_slice());
1008
1009    Ok(statement)
1010}
1011
1012#[inline(never)]
1013fn lower_prepared_statement(
1014    statement: SqlStatement,
1015    primary_key_field: &str,
1016) -> Result<LoweredSqlCommand, SqlLoweringError> {
1017    match statement {
1018        SqlStatement::Select(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
1019            LoweredSqlQuery::Select(lower_select_shape(statement, primary_key_field)?),
1020        ))),
1021        SqlStatement::Delete(statement) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Query(
1022            LoweredSqlQuery::Delete(lower_delete_shape(statement)),
1023        ))),
1024        SqlStatement::Explain(statement) => lower_explain_prepared(statement, primary_key_field),
1025        SqlStatement::Describe(_) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::DescribeEntity)),
1026        SqlStatement::ShowIndexes(_) => {
1027            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowIndexesEntity))
1028        }
1029        SqlStatement::ShowColumns(_) => {
1030            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowColumnsEntity))
1031        }
1032        SqlStatement::ShowEntities(_) => {
1033            Ok(LoweredSqlCommand(LoweredSqlCommandInner::ShowEntities))
1034        }
1035    }
1036}
1037
1038fn lower_explain_prepared(
1039    statement: SqlExplainStatement,
1040    primary_key_field: &str,
1041) -> Result<LoweredSqlCommand, SqlLoweringError> {
1042    let mode = statement.mode;
1043
1044    match statement.statement {
1045        SqlExplainTarget::Select(select_statement) => {
1046            lower_explain_select_prepared(select_statement, mode, primary_key_field)
1047        }
1048        SqlExplainTarget::Delete(delete_statement) => {
1049            Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
1050                mode,
1051                query: LoweredSqlQuery::Delete(lower_delete_shape(delete_statement)),
1052            }))
1053        }
1054    }
1055}
1056
1057fn lower_explain_select_prepared(
1058    statement: SqlSelectStatement,
1059    mode: SqlExplainMode,
1060    primary_key_field: &str,
1061) -> Result<LoweredSqlCommand, SqlLoweringError> {
1062    match lower_select_shape(statement.clone(), primary_key_field) {
1063        Ok(query) => Ok(LoweredSqlCommand(LoweredSqlCommandInner::Explain {
1064            mode,
1065            query: LoweredSqlQuery::Select(query),
1066        })),
1067        Err(SqlLoweringError::UnsupportedSelectProjection) => {
1068            let command = lower_global_aggregate_select_shape(statement)?;
1069
1070            Ok(LoweredSqlCommand(
1071                LoweredSqlCommandInner::ExplainGlobalAggregate { mode, command },
1072            ))
1073        }
1074        Err(err) => Err(err),
1075    }
1076}
1077
1078fn lower_global_aggregate_select_shape(
1079    statement: SqlSelectStatement,
1080) -> Result<LoweredSqlGlobalAggregateCommand, SqlLoweringError> {
1081    let SqlSelectStatement {
1082        projection,
1083        predicate,
1084        distinct,
1085        group_by,
1086        having,
1087        order_by,
1088        limit,
1089        offset,
1090        entity: _,
1091    } = statement;
1092
1093    if distinct {
1094        return Err(SqlLoweringError::unsupported_select_distinct());
1095    }
1096    if !group_by.is_empty() {
1097        return Err(SqlLoweringError::unsupported_select_group_by());
1098    }
1099    if !having.is_empty() {
1100        return Err(SqlLoweringError::unsupported_select_having());
1101    }
1102
1103    let terminal = lower_global_aggregate_terminal(projection)?;
1104
1105    Ok(LoweredSqlGlobalAggregateCommand {
1106        query: LoweredBaseQueryShape {
1107            predicate,
1108            order_by,
1109            limit,
1110            offset,
1111        },
1112        terminal,
1113    })
1114}
1115
1116///
1117/// ResolvedHavingClause
1118///
1119/// Pre-resolved HAVING clause shape after SQL projection aggregate index
1120/// resolution. This keeps SQL shape analysis entity-agnostic before typed
1121/// query binding.
1122///
1123#[derive(Clone, Debug)]
1124enum ResolvedHavingClause {
1125    GroupField {
1126        field: String,
1127        op: crate::db::predicate::CompareOp,
1128        value: crate::value::Value,
1129    },
1130    Aggregate {
1131        aggregate_index: usize,
1132        op: crate::db::predicate::CompareOp,
1133        value: crate::value::Value,
1134    },
1135}
1136
1137///
1138/// LoweredSelectShape
1139///
1140/// Entity-agnostic lowered SQL SELECT shape prepared for typed `Query<E>`
1141/// binding.
1142///
1143#[derive(Clone, Debug)]
1144pub(crate) struct LoweredSelectShape {
1145    scalar_projection_fields: Option<Vec<String>>,
1146    grouped_projection_aggregates: Vec<SqlAggregateCall>,
1147    group_by_fields: Vec<String>,
1148    distinct: bool,
1149    having: Vec<ResolvedHavingClause>,
1150    predicate: Option<Predicate>,
1151    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1152    limit: Option<u32>,
1153    offset: Option<u32>,
1154}
1155
1156impl LoweredSelectShape {
1157    // Report whether this lowered select shape carries grouped execution state.
1158    const fn has_grouping(&self) -> bool {
1159        !self.group_by_fields.is_empty()
1160    }
1161}
1162
1163///
1164/// LoweredBaseQueryShape
1165///
1166/// Generic-free filter/order/window query modifiers shared by delete and
1167/// global-aggregate SQL lowering.
1168/// This keeps common SQL query-shape lowering shared before typed query
1169/// binding.
1170///
1171#[derive(Clone, Debug)]
1172pub(crate) struct LoweredBaseQueryShape {
1173    predicate: Option<Predicate>,
1174    order_by: Vec<SqlOrderTerm>,
1175    limit: Option<u32>,
1176    offset: Option<u32>,
1177}
1178
1179#[inline(never)]
1180fn lower_select_shape(
1181    statement: SqlSelectStatement,
1182    primary_key_field: &str,
1183) -> Result<LoweredSelectShape, SqlLoweringError> {
1184    let SqlSelectStatement {
1185        projection,
1186        predicate,
1187        distinct,
1188        group_by,
1189        having,
1190        order_by,
1191        limit,
1192        offset,
1193        entity: _,
1194    } = statement;
1195    let projection_for_having = projection.clone();
1196
1197    // Phase 1: resolve scalar/grouped projection shape.
1198    let (scalar_projection_fields, grouped_projection_aggregates) = if group_by.is_empty() {
1199        let scalar_projection_fields =
1200            lower_scalar_projection_fields(projection, distinct, primary_key_field)?;
1201        (scalar_projection_fields, Vec::new())
1202    } else {
1203        if distinct {
1204            return Err(SqlLoweringError::unsupported_select_distinct());
1205        }
1206        let grouped_projection_aggregates =
1207            grouped_projection_aggregate_calls(&projection, group_by.as_slice())?;
1208        (None, grouped_projection_aggregates)
1209    };
1210
1211    // Phase 2: resolve HAVING symbols against grouped projection authority.
1212    let having = lower_having_clauses(
1213        having,
1214        &projection_for_having,
1215        group_by.as_slice(),
1216        grouped_projection_aggregates.as_slice(),
1217    )?;
1218
1219    Ok(LoweredSelectShape {
1220        scalar_projection_fields,
1221        grouped_projection_aggregates,
1222        group_by_fields: group_by,
1223        distinct,
1224        having,
1225        predicate,
1226        order_by,
1227        limit,
1228        offset,
1229    })
1230}
1231
1232fn lower_scalar_projection_fields(
1233    projection: SqlProjection,
1234    distinct: bool,
1235    primary_key_field: &str,
1236) -> Result<Option<Vec<String>>, SqlLoweringError> {
1237    let SqlProjection::Items(items) = projection else {
1238        if distinct {
1239            return Ok(None);
1240        }
1241
1242        return Ok(None);
1243    };
1244
1245    let has_aggregate = items
1246        .iter()
1247        .any(|item| matches!(item, SqlSelectItem::Aggregate(_)));
1248    if has_aggregate {
1249        return Err(SqlLoweringError::unsupported_select_projection());
1250    }
1251
1252    let fields = items
1253        .into_iter()
1254        .map(|item| match item {
1255            SqlSelectItem::Field(field) => Ok(field),
1256            SqlSelectItem::Aggregate(_) | SqlSelectItem::TextFunction(_) => {
1257                Err(SqlLoweringError::unsupported_select_projection())
1258            }
1259        })
1260        .collect::<Result<Vec<_>, _>>()?;
1261
1262    validate_scalar_distinct_projection(distinct, fields.as_slice(), primary_key_field)?;
1263
1264    Ok(Some(fields))
1265}
1266
1267fn validate_scalar_distinct_projection(
1268    distinct: bool,
1269    projection_fields: &[String],
1270    primary_key_field: &str,
1271) -> Result<(), SqlLoweringError> {
1272    if !distinct {
1273        return Ok(());
1274    }
1275
1276    if projection_fields.is_empty() {
1277        return Ok(());
1278    }
1279
1280    let has_primary_key_field = projection_fields
1281        .iter()
1282        .any(|field| field == primary_key_field);
1283    if !has_primary_key_field {
1284        return Err(SqlLoweringError::unsupported_select_distinct());
1285    }
1286
1287    Ok(())
1288}
1289
1290fn lower_having_clauses(
1291    having_clauses: Vec<SqlHavingClause>,
1292    projection: &SqlProjection,
1293    group_by_fields: &[String],
1294    grouped_projection_aggregates: &[SqlAggregateCall],
1295) -> Result<Vec<ResolvedHavingClause>, SqlLoweringError> {
1296    if having_clauses.is_empty() {
1297        return Ok(Vec::new());
1298    }
1299    if group_by_fields.is_empty() {
1300        return Err(SqlLoweringError::unsupported_select_having());
1301    }
1302
1303    let projection_aggregates = grouped_projection_aggregate_calls(projection, group_by_fields)
1304        .map_err(|_| SqlLoweringError::unsupported_select_having())?;
1305    if projection_aggregates.as_slice() != grouped_projection_aggregates {
1306        return Err(SqlLoweringError::unsupported_select_having());
1307    }
1308
1309    let mut lowered = Vec::with_capacity(having_clauses.len());
1310    for clause in having_clauses {
1311        match clause.symbol {
1312            SqlHavingSymbol::Field(field) => lowered.push(ResolvedHavingClause::GroupField {
1313                field,
1314                op: clause.op,
1315                value: clause.value,
1316            }),
1317            SqlHavingSymbol::Aggregate(aggregate) => {
1318                let aggregate_index =
1319                    resolve_having_aggregate_index(&aggregate, grouped_projection_aggregates)?;
1320                lowered.push(ResolvedHavingClause::Aggregate {
1321                    aggregate_index,
1322                    op: clause.op,
1323                    value: clause.value,
1324                });
1325            }
1326        }
1327    }
1328
1329    Ok(lowered)
1330}
1331
1332// Canonicalize strict numeric SQL predicate literals onto the resolved model
1333// field kind so unsigned-width fields keep strict/indexable semantics even
1334// though reduced SQL integer tokens parse through one generic numeric value
1335// variant first.
1336fn canonicalize_sql_predicate_for_model(
1337    model: &'static EntityModel,
1338    predicate: Predicate,
1339) -> Predicate {
1340    match predicate {
1341        Predicate::And(children) => Predicate::And(
1342            children
1343                .into_iter()
1344                .map(|child| canonicalize_sql_predicate_for_model(model, child))
1345                .collect(),
1346        ),
1347        Predicate::Or(children) => Predicate::Or(
1348            children
1349                .into_iter()
1350                .map(|child| canonicalize_sql_predicate_for_model(model, child))
1351                .collect(),
1352        ),
1353        Predicate::Not(inner) => Predicate::Not(Box::new(canonicalize_sql_predicate_for_model(
1354            model, *inner,
1355        ))),
1356        Predicate::Compare(mut cmp) => {
1357            canonicalize_sql_compare_for_model(model, &mut cmp);
1358            Predicate::Compare(cmp)
1359        }
1360        Predicate::True
1361        | Predicate::False
1362        | Predicate::IsNull { .. }
1363        | Predicate::IsNotNull { .. }
1364        | Predicate::IsMissing { .. }
1365        | Predicate::IsEmpty { .. }
1366        | Predicate::IsNotEmpty { .. }
1367        | Predicate::TextContains { .. }
1368        | Predicate::TextContainsCi { .. } => predicate,
1369    }
1370}
1371
1372// Resolve one lowered predicate field onto the runtime model kind that owns
1373// its strict literal compatibility rules.
1374fn model_field_kind(model: &'static EntityModel, field: &str) -> Option<FieldKind> {
1375    model
1376        .fields()
1377        .iter()
1378        .find(|candidate| candidate.name() == field)
1379        .map(crate::model::field::FieldModel::kind)
1380}
1381
1382// Keep SQL-only literal widening narrow:
1383// - only strict equality-style numeric predicates are eligible
1384// - ordering already uses `NumericWiden`
1385// - text and expression-wrapped predicates stay untouched
1386fn canonicalize_sql_compare_for_model(
1387    model: &'static EntityModel,
1388    cmp: &mut crate::db::predicate::ComparePredicate,
1389) {
1390    if cmp.coercion.id != CoercionId::Strict {
1391        return;
1392    }
1393
1394    let Some(field_kind) = model_field_kind(model, &cmp.field) else {
1395        return;
1396    };
1397
1398    match cmp.op {
1399        CompareOp::Eq | CompareOp::Ne => {
1400            if let Some(value) =
1401                canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &cmp.value)
1402            {
1403                cmp.value = value;
1404            }
1405        }
1406        CompareOp::In | CompareOp::NotIn => {
1407            let Value::List(items) = &cmp.value else {
1408                return;
1409            };
1410
1411            let items = items
1412                .iter()
1413                .map(|item| {
1414                    canonicalize_strict_sql_numeric_value_for_kind(&field_kind, item)
1415                        .unwrap_or_else(|| item.clone())
1416                })
1417                .collect();
1418            cmp.value = Value::List(items);
1419        }
1420        CompareOp::Lt
1421        | CompareOp::Lte
1422        | CompareOp::Gt
1423        | CompareOp::Gte
1424        | CompareOp::Contains
1425        | CompareOp::StartsWith
1426        | CompareOp::EndsWith => {}
1427    }
1428}
1429
1430// Convert one parsed SQL numeric literal into the exact runtime `Value` variant
1431// required by the field kind when that conversion is lossless and unambiguous.
1432// This preserves strict equality semantics while still letting SQL express
1433// unsigned-width comparisons such as `Nat16`/`u64` fields.
1434fn canonicalize_strict_sql_numeric_value_for_kind(
1435    kind: &FieldKind,
1436    value: &Value,
1437) -> Option<Value> {
1438    match kind {
1439        FieldKind::Relation { key_kind, .. } => {
1440            canonicalize_strict_sql_numeric_value_for_kind(key_kind, value)
1441        }
1442        FieldKind::Int => match value {
1443            Value::Int(inner) => Some(Value::Int(*inner)),
1444            Value::Uint(inner) => i64::try_from(*inner).ok().map(Value::Int),
1445            _ => None,
1446        },
1447        FieldKind::Uint => match value {
1448            Value::Int(inner) => u64::try_from(*inner).ok().map(Value::Uint),
1449            Value::Uint(inner) => Some(Value::Uint(*inner)),
1450            _ => None,
1451        },
1452        FieldKind::Account
1453        | FieldKind::Blob
1454        | FieldKind::Bool
1455        | FieldKind::Date
1456        | FieldKind::Decimal { .. }
1457        | FieldKind::Duration
1458        | FieldKind::Enum { .. }
1459        | FieldKind::Float32
1460        | FieldKind::Float64
1461        | FieldKind::Int128
1462        | FieldKind::IntBig
1463        | FieldKind::List(_)
1464        | FieldKind::Map { .. }
1465        | FieldKind::Principal
1466        | FieldKind::Set(_)
1467        | FieldKind::Structured { .. }
1468        | FieldKind::Subaccount
1469        | FieldKind::Text
1470        | FieldKind::Timestamp
1471        | FieldKind::Uint128
1472        | FieldKind::UintBig
1473        | FieldKind::Ulid
1474        | FieldKind::Unit => None,
1475    }
1476}
1477
1478#[inline(never)]
1479pub(in crate::db) fn apply_lowered_select_shape(
1480    mut query: StructuralQuery,
1481    lowered: LoweredSelectShape,
1482) -> Result<StructuralQuery, SqlLoweringError> {
1483    let LoweredSelectShape {
1484        scalar_projection_fields,
1485        grouped_projection_aggregates,
1486        group_by_fields,
1487        distinct,
1488        having,
1489        predicate,
1490        order_by,
1491        limit,
1492        offset,
1493    } = lowered;
1494    let model = query.model();
1495
1496    // Phase 1: apply grouped declaration semantics.
1497    for field in group_by_fields {
1498        query = query.group_by(field)?;
1499    }
1500
1501    // Phase 2: apply scalar DISTINCT and projection contracts.
1502    if distinct {
1503        query = query.distinct();
1504    }
1505    if let Some(fields) = scalar_projection_fields {
1506        query = query.select_fields(fields);
1507    }
1508    for aggregate in grouped_projection_aggregates {
1509        query = query.aggregate(lower_aggregate_call(aggregate)?);
1510    }
1511
1512    // Phase 3: bind resolved HAVING clauses against grouped terminals.
1513    for clause in having {
1514        match clause {
1515            ResolvedHavingClause::GroupField { field, op, value } => {
1516                let value = model_field_kind(model, &field)
1517                    .and_then(|field_kind| {
1518                        canonicalize_strict_sql_numeric_value_for_kind(&field_kind, &value)
1519                    })
1520                    .unwrap_or(value);
1521                query = query.having_group(field, op, value)?;
1522            }
1523            ResolvedHavingClause::Aggregate {
1524                aggregate_index,
1525                op,
1526                value,
1527            } => {
1528                query = query.having_aggregate(aggregate_index, op, value)?;
1529            }
1530        }
1531    }
1532
1533    // Phase 4: attach the shared filter/order/page tail through the base-query lane.
1534    Ok(apply_lowered_base_query_shape(
1535        query,
1536        LoweredBaseQueryShape {
1537            predicate: predicate
1538                .map(|predicate| canonicalize_sql_predicate_for_model(model, predicate)),
1539            order_by,
1540            limit,
1541            offset,
1542        },
1543    ))
1544}
1545
1546fn apply_lowered_base_query_shape(
1547    mut query: StructuralQuery,
1548    lowered: LoweredBaseQueryShape,
1549) -> StructuralQuery {
1550    if let Some(predicate) = lowered.predicate {
1551        query = query.filter(predicate);
1552    }
1553    query = apply_order_terms_structural(query, lowered.order_by);
1554    if let Some(limit) = lowered.limit {
1555        query = query.limit(limit);
1556    }
1557    if let Some(offset) = lowered.offset {
1558        query = query.offset(offset);
1559    }
1560
1561    query
1562}
1563
1564pub(in crate::db) fn bind_lowered_sql_query_structural(
1565    model: &'static crate::model::entity::EntityModel,
1566    lowered: LoweredSqlQuery,
1567    consistency: MissingRowPolicy,
1568) -> Result<StructuralQuery, SqlLoweringError> {
1569    match lowered {
1570        LoweredSqlQuery::Select(select) => {
1571            bind_lowered_sql_select_query_structural(model, select, consistency)
1572        }
1573        LoweredSqlQuery::Delete(delete) => Ok(bind_lowered_sql_delete_query_structural(
1574            model,
1575            delete,
1576            consistency,
1577        )),
1578    }
1579}
1580
1581/// Bind one lowered SQL SELECT shape onto the structural query surface.
1582///
1583/// This keeps the field-only SQL read lane narrow and owner-local: any caller
1584/// that already resolved entity authority can reuse the same lowered-SELECT to
1585/// structural-query boundary without reopening SQL shape application itself.
1586pub(in crate::db) fn bind_lowered_sql_select_query_structural(
1587    model: &'static crate::model::entity::EntityModel,
1588    select: LoweredSelectShape,
1589    consistency: MissingRowPolicy,
1590) -> Result<StructuralQuery, SqlLoweringError> {
1591    apply_lowered_select_shape(StructuralQuery::new(model, consistency), select)
1592}
1593
1594pub(in crate::db) fn bind_lowered_sql_delete_query_structural(
1595    model: &'static crate::model::entity::EntityModel,
1596    delete: LoweredBaseQueryShape,
1597    consistency: MissingRowPolicy,
1598) -> StructuralQuery {
1599    apply_lowered_base_query_shape(StructuralQuery::new(model, consistency).delete(), delete)
1600}
1601
1602pub(in crate::db) fn bind_lowered_sql_query<E: EntityKind>(
1603    lowered: LoweredSqlQuery,
1604    consistency: MissingRowPolicy,
1605) -> Result<Query<E>, SqlLoweringError> {
1606    let structural = bind_lowered_sql_query_structural(E::MODEL, lowered, consistency)?;
1607
1608    Ok(Query::from_inner(structural))
1609}
1610
1611fn bind_lowered_sql_global_aggregate_command<E: EntityKind>(
1612    lowered: LoweredSqlGlobalAggregateCommand,
1613    consistency: MissingRowPolicy,
1614) -> Result<SqlGlobalAggregateCommand<E>, SqlLoweringError> {
1615    let terminal = bind_lowered_sql_global_aggregate_terminal::<E>(lowered.terminal)?;
1616
1617    Ok(SqlGlobalAggregateCommand {
1618        query: Query::from_inner(apply_lowered_base_query_shape(
1619            StructuralQuery::new(E::MODEL, consistency),
1620            lowered.query,
1621        )),
1622        terminal,
1623    })
1624}
1625
1626fn bind_lowered_sql_global_aggregate_command_structural(
1627    model: &'static crate::model::entity::EntityModel,
1628    lowered: LoweredSqlGlobalAggregateCommand,
1629    consistency: MissingRowPolicy,
1630) -> SqlGlobalAggregateCommandCore {
1631    SqlGlobalAggregateCommandCore {
1632        query: apply_lowered_base_query_shape(
1633            StructuralQuery::new(model, consistency),
1634            lowered.query,
1635        ),
1636        terminal: lowered.terminal,
1637    }
1638}
1639
1640fn lower_global_aggregate_terminal(
1641    projection: SqlProjection,
1642) -> Result<SqlGlobalAggregateTerminal, SqlLoweringError> {
1643    let SqlProjection::Items(items) = projection else {
1644        return Err(SqlLoweringError::unsupported_select_projection());
1645    };
1646    if items.len() != 1 {
1647        return Err(SqlLoweringError::unsupported_select_projection());
1648    }
1649
1650    let Some(SqlSelectItem::Aggregate(aggregate)) = items.into_iter().next() else {
1651        return Err(SqlLoweringError::unsupported_select_projection());
1652    };
1653
1654    match lower_sql_aggregate_shape(aggregate)? {
1655        LoweredSqlAggregateShape::CountRows => Ok(SqlGlobalAggregateTerminal::CountRows),
1656        LoweredSqlAggregateShape::CountField(field) => {
1657            Ok(SqlGlobalAggregateTerminal::CountField(field))
1658        }
1659        LoweredSqlAggregateShape::FieldTarget {
1660            kind: SqlAggregateKind::Sum,
1661            field,
1662        } => Ok(SqlGlobalAggregateTerminal::SumField(field)),
1663        LoweredSqlAggregateShape::FieldTarget {
1664            kind: SqlAggregateKind::Avg,
1665            field,
1666        } => Ok(SqlGlobalAggregateTerminal::AvgField(field)),
1667        LoweredSqlAggregateShape::FieldTarget {
1668            kind: SqlAggregateKind::Min,
1669            field,
1670        } => Ok(SqlGlobalAggregateTerminal::MinField(field)),
1671        LoweredSqlAggregateShape::FieldTarget {
1672            kind: SqlAggregateKind::Max,
1673            field,
1674        } => Ok(SqlGlobalAggregateTerminal::MaxField(field)),
1675        LoweredSqlAggregateShape::FieldTarget {
1676            kind: SqlAggregateKind::Count,
1677            ..
1678        } => Err(SqlLoweringError::unsupported_select_projection()),
1679    }
1680}
1681
1682fn lower_sql_aggregate_shape(
1683    call: SqlAggregateCall,
1684) -> Result<LoweredSqlAggregateShape, SqlLoweringError> {
1685    match (call.kind, call.field) {
1686        (SqlAggregateKind::Count, None) => Ok(LoweredSqlAggregateShape::CountRows),
1687        (SqlAggregateKind::Count, Some(field)) => Ok(LoweredSqlAggregateShape::CountField(field)),
1688        (
1689            kind @ (SqlAggregateKind::Sum
1690            | SqlAggregateKind::Avg
1691            | SqlAggregateKind::Min
1692            | SqlAggregateKind::Max),
1693            Some(field),
1694        ) => Ok(LoweredSqlAggregateShape::FieldTarget { kind, field }),
1695        _ => Err(SqlLoweringError::unsupported_select_projection()),
1696    }
1697}
1698
1699fn grouped_projection_aggregate_calls(
1700    projection: &SqlProjection,
1701    group_by_fields: &[String],
1702) -> Result<Vec<SqlAggregateCall>, SqlLoweringError> {
1703    if group_by_fields.is_empty() {
1704        return Err(SqlLoweringError::unsupported_select_group_by());
1705    }
1706
1707    let SqlProjection::Items(items) = projection else {
1708        return Err(SqlLoweringError::unsupported_select_group_by());
1709    };
1710
1711    let mut projected_group_fields = Vec::<String>::new();
1712    let mut aggregate_calls = Vec::<SqlAggregateCall>::new();
1713    let mut seen_aggregate = false;
1714
1715    for item in items {
1716        match item {
1717            SqlSelectItem::Field(field) => {
1718                // Keep grouped projection deterministic and mappable to grouped
1719                // response contracts: group keys must be declared first.
1720                if seen_aggregate {
1721                    return Err(SqlLoweringError::unsupported_select_group_by());
1722                }
1723                projected_group_fields.push(field.clone());
1724            }
1725            SqlSelectItem::Aggregate(aggregate) => {
1726                seen_aggregate = true;
1727                aggregate_calls.push(aggregate.clone());
1728            }
1729            SqlSelectItem::TextFunction(_) => {
1730                return Err(SqlLoweringError::unsupported_select_group_by());
1731            }
1732        }
1733    }
1734
1735    if aggregate_calls.is_empty() || projected_group_fields.as_slice() != group_by_fields {
1736        return Err(SqlLoweringError::unsupported_select_group_by());
1737    }
1738
1739    Ok(aggregate_calls)
1740}
1741
1742fn lower_aggregate_call(
1743    call: SqlAggregateCall,
1744) -> Result<crate::db::query::builder::AggregateExpr, SqlLoweringError> {
1745    match lower_sql_aggregate_shape(call)? {
1746        LoweredSqlAggregateShape::CountRows => Ok(count()),
1747        LoweredSqlAggregateShape::CountField(field) => Ok(count_by(field)),
1748        LoweredSqlAggregateShape::FieldTarget {
1749            kind: SqlAggregateKind::Sum,
1750            field,
1751        } => Ok(sum(field)),
1752        LoweredSqlAggregateShape::FieldTarget {
1753            kind: SqlAggregateKind::Avg,
1754            field,
1755        } => Ok(avg(field)),
1756        LoweredSqlAggregateShape::FieldTarget {
1757            kind: SqlAggregateKind::Min,
1758            field,
1759        } => Ok(min_by(field)),
1760        LoweredSqlAggregateShape::FieldTarget {
1761            kind: SqlAggregateKind::Max,
1762            field,
1763        } => Ok(max_by(field)),
1764        LoweredSqlAggregateShape::FieldTarget {
1765            kind: SqlAggregateKind::Count,
1766            ..
1767        } => Err(SqlLoweringError::unsupported_select_projection()),
1768    }
1769}
1770
1771fn resolve_having_aggregate_index(
1772    target: &SqlAggregateCall,
1773    grouped_projection_aggregates: &[SqlAggregateCall],
1774) -> Result<usize, SqlLoweringError> {
1775    let mut matched = grouped_projection_aggregates
1776        .iter()
1777        .enumerate()
1778        .filter_map(|(index, aggregate)| (aggregate == target).then_some(index));
1779    let Some(index) = matched.next() else {
1780        return Err(SqlLoweringError::unsupported_select_having());
1781    };
1782    if matched.next().is_some() {
1783        return Err(SqlLoweringError::unsupported_select_having());
1784    }
1785
1786    Ok(index)
1787}
1788
1789fn lower_delete_shape(statement: SqlDeleteStatement) -> LoweredBaseQueryShape {
1790    let SqlDeleteStatement {
1791        predicate,
1792        order_by,
1793        limit,
1794        entity: _,
1795    } = statement;
1796
1797    LoweredBaseQueryShape {
1798        predicate,
1799        order_by,
1800        limit,
1801        offset: None,
1802    }
1803}
1804
1805fn apply_order_terms_structural(
1806    mut query: StructuralQuery,
1807    order_by: Vec<crate::db::sql::parser::SqlOrderTerm>,
1808) -> StructuralQuery {
1809    for term in order_by {
1810        query = match term.direction {
1811            SqlOrderDirection::Asc => query.order_by(term.field),
1812            SqlOrderDirection::Desc => query.order_by_desc(term.field),
1813        };
1814    }
1815
1816    query
1817}
1818
1819fn normalize_having_clauses(
1820    clauses: Vec<SqlHavingClause>,
1821    entity_scope: &[String],
1822) -> Vec<SqlHavingClause> {
1823    clauses
1824        .into_iter()
1825        .map(|clause| SqlHavingClause {
1826            symbol: normalize_having_symbol(clause.symbol, entity_scope),
1827            op: clause.op,
1828            value: clause.value,
1829        })
1830        .collect()
1831}
1832
1833fn normalize_having_symbol(symbol: SqlHavingSymbol, entity_scope: &[String]) -> SqlHavingSymbol {
1834    match symbol {
1835        SqlHavingSymbol::Field(field) => {
1836            SqlHavingSymbol::Field(normalize_identifier_to_scope(field, entity_scope))
1837        }
1838        SqlHavingSymbol::Aggregate(aggregate) => SqlHavingSymbol::Aggregate(
1839            normalize_aggregate_call_identifiers(aggregate, entity_scope),
1840        ),
1841    }
1842}
1843
1844fn normalize_aggregate_call_identifiers(
1845    aggregate: SqlAggregateCall,
1846    entity_scope: &[String],
1847) -> SqlAggregateCall {
1848    SqlAggregateCall {
1849        kind: aggregate.kind,
1850        field: aggregate
1851            .field
1852            .map(|field| normalize_identifier_to_scope(field, entity_scope)),
1853    }
1854}
1855
1856// Build one identifier scope used for reducing SQL-qualified field references
1857// (`entity.field`, `schema.entity.field`) into canonical planner field names.
1858fn sql_entity_scope_candidates(sql_entity: &str, expected_entity: &'static str) -> Vec<String> {
1859    let mut out = Vec::new();
1860    out.push(sql_entity.to_string());
1861    out.push(expected_entity.to_string());
1862
1863    if let Some(last) = identifier_last_segment(sql_entity) {
1864        out.push(last.to_string());
1865    }
1866    if let Some(last) = identifier_last_segment(expected_entity) {
1867        out.push(last.to_string());
1868    }
1869
1870    out
1871}
1872
1873fn normalize_projection_identifiers(
1874    projection: SqlProjection,
1875    entity_scope: &[String],
1876) -> SqlProjection {
1877    match projection {
1878        SqlProjection::All => SqlProjection::All,
1879        SqlProjection::Items(items) => SqlProjection::Items(
1880            items
1881                .into_iter()
1882                .map(|item| match item {
1883                    SqlSelectItem::Field(field) => {
1884                        SqlSelectItem::Field(normalize_identifier(field, entity_scope))
1885                    }
1886                    SqlSelectItem::Aggregate(aggregate) => {
1887                        SqlSelectItem::Aggregate(SqlAggregateCall {
1888                            kind: aggregate.kind,
1889                            field: aggregate
1890                                .field
1891                                .map(|field| normalize_identifier(field, entity_scope)),
1892                        })
1893                    }
1894                    SqlSelectItem::TextFunction(SqlTextFunctionCall {
1895                        function,
1896                        field,
1897                        literal,
1898                        literal2,
1899                        literal3,
1900                    }) => SqlSelectItem::TextFunction(SqlTextFunctionCall {
1901                        function,
1902                        field: normalize_identifier(field, entity_scope),
1903                        literal,
1904                        literal2,
1905                        literal3,
1906                    }),
1907                })
1908                .collect(),
1909        ),
1910    }
1911}
1912
1913fn normalize_order_terms(
1914    terms: Vec<crate::db::sql::parser::SqlOrderTerm>,
1915    entity_scope: &[String],
1916) -> Vec<crate::db::sql::parser::SqlOrderTerm> {
1917    terms
1918        .into_iter()
1919        .map(|term| crate::db::sql::parser::SqlOrderTerm {
1920            field: normalize_order_term_identifier(term.field, entity_scope),
1921            direction: term.direction,
1922        })
1923        .collect()
1924}
1925
1926fn normalize_order_term_identifier(identifier: String, entity_scope: &[String]) -> String {
1927    let Some(expression) = ExpressionOrderTerm::parse(identifier.as_str()) else {
1928        return normalize_identifier(identifier, entity_scope);
1929    };
1930    let normalized_field = normalize_identifier(expression.field().to_string(), entity_scope);
1931
1932    expression.canonical_text_with_field(normalized_field.as_str())
1933}
1934
1935fn normalize_identifier_list(fields: Vec<String>, entity_scope: &[String]) -> Vec<String> {
1936    fields
1937        .into_iter()
1938        .map(|field| normalize_identifier(field, entity_scope))
1939        .collect()
1940}
1941
1942// SQL lowering only adapts identifier qualification (`entity.field` -> `field`)
1943// and delegates predicate-tree traversal ownership to `db::predicate`.
1944fn adapt_predicate_identifiers_to_scope(
1945    predicate: Predicate,
1946    entity_scope: &[String],
1947) -> Predicate {
1948    rewrite_field_identifiers(predicate, |field| normalize_identifier(field, entity_scope))
1949}
1950
1951fn normalize_identifier(identifier: String, entity_scope: &[String]) -> String {
1952    normalize_identifier_to_scope(identifier, entity_scope)
1953}
1954
1955fn ensure_entity_matches_expected(
1956    sql_entity: &str,
1957    expected_entity: &'static str,
1958) -> Result<(), SqlLoweringError> {
1959    if identifiers_tail_match(sql_entity, expected_entity) {
1960        return Ok(());
1961    }
1962
1963    Err(SqlLoweringError::entity_mismatch(
1964        sql_entity,
1965        expected_entity,
1966    ))
1967}