Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::IndexCompileTarget,
10        predicate::{Predicate, PredicateProgram},
11        query::plan::{
12            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
13            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
14            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
15            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
16            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
17            derive_logical_pushdown_eligibility,
18            expr::{
19                Expr, ProjectionField, ProjectionSpec, ScalarProjectionExpr,
20                compile_scalar_projection_expr, compile_scalar_projection_plan,
21                projection_field_expr,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
25            lower_projection_identity, lower_projection_intent,
26            residual_query_predicate_after_access_path_bounds,
27            residual_query_predicate_after_filtered_access,
28            resolved_grouped_distinct_execution_strategy_for_model,
29        },
30    },
31    error::InternalError,
32    model::{
33        entity::{EntityModel, resolve_field_slot},
34        index::IndexKeyItemsRef,
35    },
36};
37
38impl QueryMode {
39    /// True if this mode represents a load intent.
40    #[must_use]
41    pub const fn is_load(&self) -> bool {
42        match self {
43            Self::Load(_) => true,
44            Self::Delete(_) => false,
45        }
46    }
47
48    /// True if this mode represents a delete intent.
49    #[must_use]
50    pub const fn is_delete(&self) -> bool {
51        match self {
52            Self::Delete(_) => true,
53            Self::Load(_) => false,
54        }
55    }
56}
57
58impl LogicalPlan {
59    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
60    #[must_use]
61    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
62        match self {
63            Self::Scalar(plan) => plan,
64            Self::Grouped(plan) => &plan.scalar,
65        }
66    }
67
68    /// Borrow scalar semantic fields mutably across logical variants for tests.
69    #[must_use]
70    #[cfg(test)]
71    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
72        match self {
73            Self::Scalar(plan) => plan,
74            Self::Grouped(plan) => &mut plan.scalar,
75        }
76    }
77
78    /// Test-only shorthand for explicit scalar semantic borrowing.
79    #[must_use]
80    #[cfg(test)]
81    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
82        self.scalar_semantics()
83    }
84
85    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
86    #[must_use]
87    #[cfg(test)]
88    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
89        self.scalar_semantics_mut()
90    }
91}
92
93impl AccessPlannedQuery {
94    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
95    #[must_use]
96    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
97        self.logical.scalar_semantics()
98    }
99
100    /// Borrow scalar semantic fields mutably across logical variants for tests.
101    #[must_use]
102    #[cfg(test)]
103    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
104        self.logical.scalar_semantics_mut()
105    }
106
107    /// Test-only shorthand for explicit scalar plan borrowing.
108    #[must_use]
109    #[cfg(test)]
110    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
111        self.scalar_plan()
112    }
113
114    /// Test-only shorthand for explicit mutable scalar plan borrowing.
115    #[must_use]
116    #[cfg(test)]
117    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
118        self.scalar_plan_mut()
119    }
120
121    /// Borrow grouped semantic fields when this plan is grouped.
122    #[must_use]
123    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
124        match &self.logical {
125            LogicalPlan::Scalar(_) => None,
126            LogicalPlan::Grouped(plan) => Some(plan),
127        }
128    }
129
130    /// Lower this plan into one canonical planner-owned projection semantic spec.
131    #[must_use]
132    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
133        if let Some(static_shape) = &self.static_planning_shape {
134            return static_shape.projection_spec.clone();
135        }
136
137        lower_projection_intent(model, &self.logical, &self.projection_selection)
138    }
139
140    /// Lower this plan into one projection semantic shape for identity hashing.
141    #[must_use]
142    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
143        lower_projection_identity(&self.logical)
144    }
145
146    /// Return the executor-facing predicate after removing only filtered-index
147    /// guard clauses the chosen access path already proves.
148    ///
149    /// This conservative form is used by preparation/explain surfaces that
150    /// still need to see access-bound equalities as index-predicate input.
151    #[must_use]
152    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
153        let query_predicate = self.scalar_plan().predicate.as_ref()?;
154
155        match self.access.selected_index_model() {
156            Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
157            None => Some(query_predicate.clone()),
158        }
159    }
160
161    /// Return the executor-facing residual predicate after removing any
162    /// filtered-index guard clauses and fixed access-bound equalities already
163    /// guaranteed by the chosen path.
164    #[must_use]
165    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
166        // Phase 1: strip only filtered-index guard clauses the chosen access
167        // path already proves.
168        let filtered_residual = self.execution_preparation_predicate();
169        let filtered_residual = filtered_residual.as_ref()?;
170
171        // Phase 2: strip any additional equality clauses already guaranteed by
172        // the concrete access-path bounds, such as `tier = 'gold'` on one
173        // selected `IndexPrefix(tier='gold', ...)` route.
174        residual_query_predicate_after_access_path_bounds(self.access.as_path(), filtered_residual)
175    }
176
177    /// Borrow the planner-compiled execution-preparation predicate program.
178    #[must_use]
179    pub(in crate::db) const fn execution_preparation_compiled_predicate(
180        &self,
181    ) -> Option<&PredicateProgram> {
182        self.static_planning_shape()
183            .execution_preparation_compiled_predicate
184            .as_ref()
185    }
186
187    /// Borrow the planner-compiled effective runtime predicate program.
188    #[must_use]
189    pub(in crate::db) const fn effective_runtime_compiled_predicate(
190        &self,
191    ) -> Option<&PredicateProgram> {
192        match self
193            .static_planning_shape()
194            .effective_runtime_filter_program
195            .as_ref()
196        {
197            Some(EffectiveRuntimeFilterProgram::Predicate(program)) => Some(program),
198            Some(EffectiveRuntimeFilterProgram::Expr(_)) | None => None,
199        }
200    }
201
202    /// Borrow the planner-compiled effective runtime scalar filter expression.
203    #[must_use]
204    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
205        &self,
206    ) -> Option<&ScalarProjectionExpr> {
207        match self
208            .static_planning_shape()
209            .effective_runtime_filter_program
210            .as_ref()
211        {
212            Some(EffectiveRuntimeFilterProgram::Expr(expr)) => Some(expr),
213            Some(EffectiveRuntimeFilterProgram::Predicate(_)) | None => None,
214        }
215    }
216
217    /// Borrow the planner-frozen effective runtime scalar filter program.
218    #[must_use]
219    pub(in crate::db) const fn effective_runtime_filter_program(
220        &self,
221    ) -> Option<&EffectiveRuntimeFilterProgram> {
222        self.static_planning_shape()
223            .effective_runtime_filter_program
224            .as_ref()
225    }
226
227    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
228    #[must_use]
229    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
230        if !self.scalar_plan().distinct {
231            return DistinctExecutionStrategy::None;
232        }
233
234        // DISTINCT on duplicate-safe single-path access shapes is a planner
235        // no-op for runtime dedup mechanics. Composite shapes can surface
236        // duplicate keys and therefore retain explicit dedup execution.
237        match distinct_runtime_dedup_strategy(&self.access) {
238            Some(strategy) => strategy,
239            None => DistinctExecutionStrategy::None,
240        }
241    }
242
243    /// Freeze one planner-owned route profile after model validation completes.
244    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
245        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
246    }
247
248    /// Freeze planner-owned executor metadata after logical/access planning completes.
249    pub(in crate::db) fn finalize_static_planning_shape_for_model(
250        &mut self,
251        model: &EntityModel,
252    ) -> Result<(), InternalError> {
253        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
254
255        Ok(())
256    }
257
258    /// Build one immutable execution-shape signature contract for runtime layers.
259    #[must_use]
260    pub(in crate::db) fn execution_shape_signature(
261        &self,
262        entity_path: &'static str,
263    ) -> ExecutionShapeSignature {
264        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
265    }
266
267    /// Return whether the chosen access contract fully satisfies the current
268    /// scalar query predicate without any additional post-access filtering.
269    #[must_use]
270    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
271        self.scalar_plan().predicate.is_some() && self.effective_execution_predicate().is_none()
272    }
273
274    /// Return whether scalar filter semantics still require post-access
275    /// filtering after accounting for any derived pushdown predicate and
276    /// access-path equality bounds.
277    #[must_use]
278    pub(in crate::db) fn has_residual_filter(&self) -> bool {
279        match (
280            self.scalar_plan().filter_expr.as_ref(),
281            self.scalar_plan().predicate.as_ref(),
282        ) {
283            (None, None) => false,
284            (Some(_), None) => true,
285            (Some(_) | None, Some(_)) => !self.predicate_fully_satisfied_by_access_contract(),
286        }
287    }
288
289    /// Transitional alias for existing residual-filter call sites while scalar
290    /// WHERE ownership is moving from predicate-only to expression-first.
291    #[must_use]
292    pub(in crate::db) fn has_residual_predicate(&self) -> bool {
293        self.has_residual_filter()
294    }
295
296    /// Borrow the planner-frozen compiled scalar projection program.
297    #[must_use]
298    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
299        self.static_planning_shape()
300            .scalar_projection_plan
301            .as_deref()
302    }
303
304    /// Borrow the planner-frozen primary-key field name.
305    #[must_use]
306    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
307        self.static_planning_shape().primary_key_name
308    }
309
310    /// Borrow the planner-frozen projection slot reachability set.
311    #[must_use]
312    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
313        self.static_planning_shape()
314            .projection_referenced_slots
315            .as_slice()
316    }
317
318    /// Borrow the planner-frozen mask for direct projected output slots.
319    #[must_use]
320    #[cfg(any(test, feature = "diagnostics"))]
321    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
322        self.static_planning_shape().projected_slot_mask.as_slice()
323    }
324
325    /// Return whether projection remains the full model-identity field list.
326    #[must_use]
327    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
328        self.static_planning_shape().projection_is_model_identity
329    }
330
331    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
332    #[must_use]
333    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
334        self.static_planning_shape()
335            .order_referenced_slots
336            .as_deref()
337    }
338
339    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
340    #[must_use]
341    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
342        self.static_planning_shape().resolved_order.as_ref()
343    }
344
345    /// Borrow the planner-frozen access slot map used by index predicate compilation.
346    #[must_use]
347    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
348        self.static_planning_shape().slot_map.as_deref()
349    }
350
351    /// Borrow grouped aggregate execution specs already resolved during static planning.
352    #[must_use]
353    pub(in crate::db) fn grouped_aggregate_execution_specs(
354        &self,
355    ) -> Option<&[GroupedAggregateExecutionSpec]> {
356        self.static_planning_shape()
357            .grouped_aggregate_execution_specs
358            .as_deref()
359    }
360
361    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
362    #[must_use]
363    pub(in crate::db) const fn grouped_distinct_execution_strategy(
364        &self,
365    ) -> Option<&GroupedDistinctExecutionStrategy> {
366        self.static_planning_shape()
367            .grouped_distinct_execution_strategy
368            .as_ref()
369    }
370
371    /// Borrow the frozen projection semantic shape without reopening model ownership.
372    #[must_use]
373    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
374        &self.static_planning_shape().projection_spec
375    }
376
377    /// Borrow the frozen direct projection slots without reopening model ownership.
378    #[must_use]
379    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
380        self.static_planning_shape()
381            .projection_direct_slots
382            .as_deref()
383    }
384
385    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
386    #[must_use]
387    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
388        self.static_planning_shape()
389            .index_compile_targets
390            .as_deref()
391    }
392
393    const fn static_planning_shape(&self) -> &StaticPlanningShape {
394        self.static_planning_shape
395            .as_ref()
396            .expect("access-planned queries must freeze static planning shape before execution")
397    }
398}
399
400fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
401    match access {
402        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
403            Some(DistinctExecutionStrategy::PreOrdered)
404        }
405        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
406            Some(DistinctExecutionStrategy::HashMaterialize)
407        }
408        AccessPlan::Path(_) => None,
409    }
410}
411
412fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
413    let is_grouped_safe = plan
414        .grouped_plan()
415        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
416
417    ContinuationPolicy::new(
418        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
419        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
420        is_grouped_safe,
421    )
422}
423
424/// Project one planner-owned route profile from the finalized logical+access plan.
425#[must_use]
426pub(in crate::db) fn project_planner_route_profile_for_model(
427    model: &EntityModel,
428    plan: &AccessPlannedQuery,
429) -> PlannerRouteProfile {
430    let secondary_order_contract = plan
431        .scalar_plan()
432        .order
433        .as_ref()
434        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
435
436    PlannerRouteProfile::new(
437        derive_continuation_policy_validated(plan),
438        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
439        secondary_order_contract,
440    )
441}
442
443fn project_static_planning_shape_for_model(
444    model: &EntityModel,
445    plan: &AccessPlannedQuery,
446) -> Result<StaticPlanningShape, InternalError> {
447    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
448    let execution_preparation_compiled_predicate =
449        compile_optional_predicate(model, plan.execution_preparation_predicate().as_ref());
450    let effective_runtime_filter_program = compile_effective_runtime_filter_program(model, plan)?;
451    let scalar_projection_plan =
452        if plan.grouped_plan().is_none() {
453            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
454            InternalError::query_executor_invariant(
455                "scalar projection program must compile during static planning finalization",
456            )
457        })?)
458        } else {
459            None
460        };
461    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
462        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
463    let projection_direct_slots =
464        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
465    let projection_referenced_slots =
466        projection_referenced_slots_for_spec(model, &projection_spec)?;
467    let projected_slot_mask =
468        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
469    let projection_is_model_identity =
470        projection_is_model_identity_for_spec(model, &projection_spec);
471    let resolved_order = resolved_order_for_plan(model, plan)?;
472    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
473    let slot_map = slot_map_for_model_plan(model, plan);
474    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
475
476    Ok(StaticPlanningShape {
477        primary_key_name: model.primary_key.name,
478        projection_spec,
479        execution_preparation_compiled_predicate,
480        effective_runtime_filter_program,
481        scalar_projection_plan,
482        grouped_aggregate_execution_specs,
483        grouped_distinct_execution_strategy,
484        projection_direct_slots,
485        projection_referenced_slots,
486        projected_slot_mask,
487        projection_is_model_identity,
488        resolved_order,
489        order_referenced_slots,
490        slot_map,
491        index_compile_targets,
492    })
493}
494
495// Compile the executor-owned residual scalar filter contract once so runtime
496// can consume either the predicate fast path or the expression-first filter
497// path without rediscovering which boundary applies.
498fn compile_effective_runtime_filter_program(
499    model: &EntityModel,
500    plan: &AccessPlannedQuery,
501) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
502    if !plan.has_residual_filter() {
503        return Ok(None);
504    }
505
506    // Keep the existing predicate fast path when the residual semantics still
507    // fit the derived predicate contract. The expression-owned lane is only
508    // needed once pushdown loses semantic coverage and a residual predicate no
509    // longer exists.
510    if let Some(predicate) = plan.effective_execution_predicate().as_ref() {
511        return Ok(Some(EffectiveRuntimeFilterProgram::Predicate(
512            PredicateProgram::compile(model, predicate),
513        )));
514    }
515
516    if let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() {
517        let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
518            InternalError::query_invalid_logical_plan(
519                "effective runtime scalar filter expression must compile during static planning finalization",
520            )
521        })?;
522
523        return Ok(Some(EffectiveRuntimeFilterProgram::Expr(compiled)));
524    }
525
526    Ok(None)
527}
528
529// Compile one optional planner-frozen predicate program while keeping the
530// static planning assembly path free of repeated `Option` mapping boilerplate.
531fn compile_optional_predicate(
532    model: &EntityModel,
533    predicate: Option<&Predicate>,
534) -> Option<PredicateProgram> {
535    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
536}
537
538// Resolve the grouped-only static planning semantics bundle once so grouped
539// aggregate execution specs and grouped DISTINCT strategy stay derived under
540// one shared grouped-plan branch.
541fn resolve_grouped_static_planning_semantics(
542    model: &EntityModel,
543    plan: &AccessPlannedQuery,
544    projection_spec: &ProjectionSpec,
545) -> Result<
546    (
547        Option<Vec<GroupedAggregateExecutionSpec>>,
548        Option<GroupedDistinctExecutionStrategy>,
549    ),
550    InternalError,
551> {
552    let Some(grouped) = plan.grouped_plan() else {
553        return Ok((None, None));
554    };
555
556    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
557        projection_spec,
558        grouped.group.group_fields.as_slice(),
559        grouped.group.aggregates.as_slice(),
560    )?;
561    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
562
563    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
564        model,
565        aggregate_specs.as_slice(),
566    )?);
567    let grouped_distinct_execution_strategy =
568        Some(resolved_grouped_distinct_execution_strategy_for_model(
569            model,
570            grouped.group.group_fields.as_slice(),
571            grouped.group.aggregates.as_slice(),
572            grouped.having_expr.as_ref(),
573        )?);
574
575    Ok((
576        grouped_aggregate_execution_specs,
577        grouped_distinct_execution_strategy,
578    ))
579}
580
581fn extend_grouped_having_aggregate_specs(
582    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
583    grouped: &GroupPlan,
584) -> Result<(), InternalError> {
585    if let Some(having_expr) = grouped.having_expr.as_ref() {
586        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
587    }
588
589    Ok(())
590}
591
592fn collect_grouped_having_expr_aggregate_specs(
593    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
594    expr: &Expr,
595) -> Result<(), InternalError> {
596    match expr {
597        Expr::Aggregate(aggregate_expr) => {
598            let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
599
600            if aggregate_specs
601                .iter()
602                .all(|current| current != &aggregate_spec)
603            {
604                aggregate_specs.push(aggregate_spec);
605            }
606        }
607        Expr::Field(_) | Expr::Literal(_) => {}
608        Expr::FunctionCall { args, .. } => {
609            for arg in args {
610                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arg)?;
611            }
612        }
613        Expr::Unary { expr, .. } => {
614            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
615        }
616        Expr::Case {
617            when_then_arms,
618            else_expr,
619        } => {
620            for arm in when_then_arms {
621                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.condition())?;
622                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.result())?;
623            }
624
625            collect_grouped_having_expr_aggregate_specs(aggregate_specs, else_expr)?;
626        }
627        Expr::Binary { left, right, .. } => {
628            collect_grouped_having_expr_aggregate_specs(aggregate_specs, left)?;
629            collect_grouped_having_expr_aggregate_specs(aggregate_specs, right)?;
630        }
631        #[cfg(test)]
632        Expr::Alias { expr, .. } => {
633            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
634        }
635    }
636
637    Ok(())
638}
639
640fn projection_referenced_slots_for_spec(
641    model: &EntityModel,
642    projection: &ProjectionSpec,
643) -> Result<Vec<usize>, InternalError> {
644    let mut referenced = vec![false; model.fields().len()];
645
646    for field in projection.fields() {
647        mark_projection_expr_slots(
648            model,
649            projection_field_expr(field),
650            referenced.as_mut_slice(),
651        )?;
652    }
653
654    Ok(referenced
655        .into_iter()
656        .enumerate()
657        .filter_map(|(slot, required)| required.then_some(slot))
658        .collect())
659}
660
661fn mark_projection_expr_slots(
662    model: &EntityModel,
663    expr: &Expr,
664    referenced: &mut [bool],
665) -> Result<(), InternalError> {
666    match expr {
667        Expr::Field(field_id) => {
668            let field_name = field_id.as_str();
669            let slot = resolve_required_field_slot(model, field_name, || {
670                InternalError::query_invalid_logical_plan(format!(
671                    "projection expression references unknown field '{field_name}'",
672                ))
673            })?;
674            referenced[slot] = true;
675        }
676        Expr::Literal(_) => {}
677        Expr::FunctionCall { args, .. } => {
678            for arg in args {
679                mark_projection_expr_slots(model, arg, referenced)?;
680            }
681        }
682        Expr::Case {
683            when_then_arms,
684            else_expr,
685        } => {
686            for arm in when_then_arms {
687                mark_projection_expr_slots(model, arm.condition(), referenced)?;
688                mark_projection_expr_slots(model, arm.result(), referenced)?;
689            }
690            mark_projection_expr_slots(model, else_expr.as_ref(), referenced)?;
691        }
692        Expr::Aggregate(_) => {}
693        #[cfg(test)]
694        Expr::Alias { expr, .. } => {
695            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
696        }
697        Expr::Unary { expr, .. } => {
698            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
699        }
700        Expr::Binary { left, right, .. } => {
701            mark_projection_expr_slots(model, left.as_ref(), referenced)?;
702            mark_projection_expr_slots(model, right.as_ref(), referenced)?;
703        }
704    }
705
706    Ok(())
707}
708
709fn projected_slot_mask_for_spec(
710    model: &EntityModel,
711    direct_projection_slots: Option<&[usize]>,
712) -> Vec<bool> {
713    let mut projected_slots = vec![false; model.fields().len()];
714
715    let Some(direct_projection_slots) = direct_projection_slots else {
716        return projected_slots;
717    };
718
719    for slot in direct_projection_slots.iter().copied() {
720        if let Some(projected) = projected_slots.get_mut(slot) {
721            *projected = true;
722        }
723    }
724
725    projected_slots
726}
727
728fn projection_is_model_identity_for_spec(model: &EntityModel, projection: &ProjectionSpec) -> bool {
729    if projection.len() != model.fields().len() {
730        return false;
731    }
732
733    for (field_model, projected_field) in model.fields().iter().zip(projection.fields()) {
734        match projected_field {
735            ProjectionField::Scalar {
736                expr: Expr::Field(field_id),
737                alias: None,
738            } if field_id.as_str() == field_model.name() => {}
739            ProjectionField::Scalar { .. } => return false,
740        }
741    }
742
743    true
744}
745
746fn resolved_order_for_plan(
747    model: &EntityModel,
748    plan: &AccessPlannedQuery,
749) -> Result<Option<ResolvedOrder>, InternalError> {
750    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
751        return Ok(None);
752    }
753
754    let Some(order) = plan.scalar_plan().order.as_ref() else {
755        return Ok(None);
756    };
757
758    let mut fields = Vec::with_capacity(order.fields.len());
759    for term in &order.fields {
760        fields.push(ResolvedOrderField::new(
761            resolved_order_value_source_for_term(model, term)?,
762            term.direction(),
763        ));
764    }
765
766    Ok(Some(ResolvedOrder::new(fields)))
767}
768
769fn resolved_order_value_source_for_term(
770    model: &EntityModel,
771    term: &crate::db::query::plan::OrderTerm,
772) -> Result<ResolvedOrderValueSource, InternalError> {
773    if term.direct_field().is_none() {
774        let rendered = term.rendered_label();
775        validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
776        let compiled = compile_scalar_projection_expr(model, term.expr())
777            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
778
779        return Ok(ResolvedOrderValueSource::expression(compiled));
780    }
781
782    let field = term
783        .direct_field()
784        .expect("direct-field order branch should only execute for field-backed terms");
785    let slot = resolve_required_field_slot(model, field, || {
786        InternalError::query_invalid_logical_plan(format!(
787            "order expression references unknown field '{field}'",
788        ))
789    })?;
790
791    Ok(ResolvedOrderValueSource::direct_field(slot))
792}
793
794fn validate_resolved_order_expr_fields(
795    model: &EntityModel,
796    expr: &Expr,
797    rendered: &str,
798) -> Result<(), InternalError> {
799    match expr {
800        Expr::Field(field_id) => {
801            resolve_required_field_slot(model, field_id.as_str(), || {
802                InternalError::query_invalid_logical_plan(format!(
803                    "order expression references unknown field '{rendered}'",
804                ))
805            })?;
806        }
807        Expr::Literal(_) => {}
808        Expr::FunctionCall { args, .. } => {
809            for arg in args {
810                validate_resolved_order_expr_fields(model, arg, rendered)?;
811            }
812        }
813        Expr::Case {
814            when_then_arms,
815            else_expr,
816        } => {
817            for arm in when_then_arms {
818                validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
819                validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
820            }
821            validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
822        }
823        Expr::Binary { left, right, .. } => {
824            validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
825            validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
826        }
827        Expr::Aggregate(_) => {
828            return Err(order_expression_scalar_seam_error(rendered));
829        }
830        #[cfg(test)]
831        Expr::Alias { .. } => {
832            return Err(order_expression_scalar_seam_error(rendered));
833        }
834        Expr::Unary { .. } => {
835            return Err(order_expression_scalar_seam_error(rendered));
836        }
837    }
838
839    Ok(())
840}
841
842// Resolve one model field slot while keeping planner invalid-logical-plan
843// error construction at the callsite that owns the diagnostic wording.
844fn resolve_required_field_slot<F>(
845    model: &EntityModel,
846    field: &str,
847    invalid_plan_error: F,
848) -> Result<usize, InternalError>
849where
850    F: FnOnce() -> InternalError,
851{
852    resolve_field_slot(model, field).ok_or_else(invalid_plan_error)
853}
854
855// Keep the scalar-order expression seam violation text under one helper so the
856// parse validation and compile validation paths do not drift.
857fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
858    InternalError::query_invalid_logical_plan(format!(
859        "order expression '{rendered}' did not stay on the scalar expression seam",
860    ))
861}
862
863// Keep one stable executor-facing slot list for grouped order terms after the
864// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
865// now consumes this same referenced-slot contract instead of re-deriving order
866// sources from planner strategy at runtime.
867fn order_referenced_slots_for_resolved_order(
868    resolved_order: Option<&ResolvedOrder>,
869) -> Option<Vec<usize>> {
870    let resolved_order = resolved_order?;
871    let mut referenced = Vec::new();
872
873    // Keep one stable slot list without re-parsing order expressions after the
874    // planner has already frozen structural ORDER BY sources.
875    for field in resolved_order.fields() {
876        field.source().extend_referenced_slots(&mut referenced);
877    }
878
879    Some(referenced)
880}
881
882fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
883    let access_strategy = plan.access.resolve_strategy();
884    let executable = access_strategy.executable();
885
886    resolved_index_slots_for_access_path(model, executable)
887}
888
889fn resolved_index_slots_for_access_path(
890    model: &EntityModel,
891    access: &ExecutableAccessPlan<'_, crate::value::Value>,
892) -> Option<Vec<usize>> {
893    let path = access.as_path()?;
894    let path_capabilities = path.capabilities();
895    let index_fields = path_capabilities.index_fields_for_slot_map()?;
896    let mut slots = Vec::with_capacity(index_fields.len());
897
898    for field_name in index_fields {
899        let slot = resolve_field_slot(model, field_name)?;
900        slots.push(slot);
901    }
902
903    Some(slots)
904}
905
906fn index_compile_targets_for_model_plan(
907    model: &EntityModel,
908    plan: &AccessPlannedQuery,
909) -> Option<Vec<IndexCompileTarget>> {
910    let index = plan.access.as_path()?.selected_index_model()?;
911    let mut targets = Vec::new();
912
913    match index.key_items() {
914        IndexKeyItemsRef::Fields(fields) => {
915            for (component_index, &field_name) in fields.iter().enumerate() {
916                let field_slot = resolve_field_slot(model, field_name)?;
917                targets.push(IndexCompileTarget {
918                    component_index,
919                    field_slot,
920                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
921                });
922            }
923        }
924        IndexKeyItemsRef::Items(items) => {
925            for (component_index, &key_item) in items.iter().enumerate() {
926                let field_slot = resolve_field_slot(model, key_item.field())?;
927                targets.push(IndexCompileTarget {
928                    component_index,
929                    field_slot,
930                    key_item,
931                });
932            }
933        }
934    }
935
936    Some(targets)
937}