Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::IndexCompileTarget,
10        predicate::{PredicateExecutionModel, PredicateProgram},
11        query::plan::{
12            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
13            ExecutionShapeSignature, GroupHavingExpr, GroupHavingValueExpr, GroupPlan,
14            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
15            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
16            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
17            derive_logical_pushdown_eligibility,
18            expr::{
19                Expr, ProjectionField, ProjectionSpec, ScalarProjectionExpr,
20                compile_scalar_projection_expr, compile_scalar_projection_plan,
21                parse_supported_computed_order_expr, projection_field_expr,
22            },
23            group::GroupedAggregateProjectionSpec,
24            grouped_aggregate_execution_specs,
25            grouped_aggregate_projection_specs_from_projection_spec,
26            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
27            lower_projection_identity, lower_projection_intent,
28            residual_query_predicate_after_access_path_bounds,
29            residual_query_predicate_after_filtered_access,
30            resolved_grouped_distinct_execution_strategy_for_model,
31        },
32    },
33    error::InternalError,
34    model::{
35        entity::{EntityModel, resolve_field_slot},
36        index::IndexKeyItemsRef,
37    },
38};
39
40impl QueryMode {
41    /// True if this mode represents a load intent.
42    #[must_use]
43    pub const fn is_load(&self) -> bool {
44        match self {
45            Self::Load(_) => true,
46            Self::Delete(_) => false,
47        }
48    }
49
50    /// True if this mode represents a delete intent.
51    #[must_use]
52    pub const fn is_delete(&self) -> bool {
53        match self {
54            Self::Delete(_) => true,
55            Self::Load(_) => false,
56        }
57    }
58}
59
60impl LogicalPlan {
61    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
62    #[must_use]
63    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
64        match self {
65            Self::Scalar(plan) => plan,
66            Self::Grouped(plan) => &plan.scalar,
67        }
68    }
69
70    /// Borrow scalar semantic fields mutably across logical variants for tests.
71    #[must_use]
72    #[cfg(test)]
73    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
74        match self {
75            Self::Scalar(plan) => plan,
76            Self::Grouped(plan) => &mut plan.scalar,
77        }
78    }
79
80    /// Test-only shorthand for explicit scalar semantic borrowing.
81    #[must_use]
82    #[cfg(test)]
83    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
84        self.scalar_semantics()
85    }
86
87    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
88    #[must_use]
89    #[cfg(test)]
90    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
91        self.scalar_semantics_mut()
92    }
93}
94
95impl AccessPlannedQuery {
96    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
97    #[must_use]
98    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
99        self.logical.scalar_semantics()
100    }
101
102    /// Borrow scalar semantic fields mutably across logical variants for tests.
103    #[must_use]
104    #[cfg(test)]
105    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
106        self.logical.scalar_semantics_mut()
107    }
108
109    /// Test-only shorthand for explicit scalar plan borrowing.
110    #[must_use]
111    #[cfg(test)]
112    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
113        self.scalar_plan()
114    }
115
116    /// Test-only shorthand for explicit mutable scalar plan borrowing.
117    #[must_use]
118    #[cfg(test)]
119    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
120        self.scalar_plan_mut()
121    }
122
123    /// Borrow grouped semantic fields when this plan is grouped.
124    #[must_use]
125    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
126        match &self.logical {
127            LogicalPlan::Scalar(_) => None,
128            LogicalPlan::Grouped(plan) => Some(plan),
129        }
130    }
131
132    /// Lower this plan into one canonical planner-owned projection semantic spec.
133    #[must_use]
134    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
135        if let Some(static_shape) = &self.static_planning_shape {
136            return static_shape.projection_spec.clone();
137        }
138
139        lower_projection_intent(model, &self.logical, &self.projection_selection)
140    }
141
142    /// Lower this plan into one projection semantic shape for identity hashing.
143    #[must_use]
144    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
145        lower_projection_identity(&self.logical)
146    }
147
148    /// Return the executor-facing predicate after removing only filtered-index
149    /// guard clauses the chosen access path already proves.
150    ///
151    /// This conservative form is used by preparation/explain surfaces that
152    /// still need to see access-bound equalities as index-predicate input.
153    #[must_use]
154    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<PredicateExecutionModel> {
155        let query_predicate = self.scalar_plan().predicate.as_ref()?;
156
157        match self.access.selected_index_model() {
158            Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
159            None => Some(query_predicate.clone()),
160        }
161    }
162
163    /// Return the executor-facing residual predicate after removing any
164    /// filtered-index guard clauses and fixed access-bound equalities already
165    /// guaranteed by the chosen path.
166    #[must_use]
167    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<PredicateExecutionModel> {
168        // Phase 1: strip only filtered-index guard clauses the chosen access
169        // path already proves.
170        let filtered_residual = self.execution_preparation_predicate();
171        let filtered_residual = filtered_residual.as_ref()?;
172
173        // Phase 2: strip any additional equality clauses already guaranteed by
174        // the concrete access-path bounds, such as `tier = 'gold'` on one
175        // selected `IndexPrefix(tier='gold', ...)` route.
176        residual_query_predicate_after_access_path_bounds(self.access.as_path(), filtered_residual)
177    }
178
179    /// Borrow the planner-compiled execution-preparation predicate program.
180    #[must_use]
181    pub(in crate::db) const fn execution_preparation_compiled_predicate(
182        &self,
183    ) -> Option<&PredicateProgram> {
184        self.static_planning_shape()
185            .execution_preparation_compiled_predicate
186            .as_ref()
187    }
188
189    /// Borrow the planner-compiled effective runtime predicate program.
190    #[must_use]
191    pub(in crate::db) const fn effective_runtime_compiled_predicate(
192        &self,
193    ) -> Option<&PredicateProgram> {
194        self.static_planning_shape()
195            .effective_runtime_compiled_predicate
196            .as_ref()
197    }
198
199    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
200    #[must_use]
201    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
202        if !self.scalar_plan().distinct {
203            return DistinctExecutionStrategy::None;
204        }
205
206        // DISTINCT on duplicate-safe single-path access shapes is a planner
207        // no-op for runtime dedup mechanics. Composite shapes can surface
208        // duplicate keys and therefore retain explicit dedup execution.
209        match distinct_runtime_dedup_strategy(&self.access) {
210            Some(strategy) => strategy,
211            None => DistinctExecutionStrategy::None,
212        }
213    }
214
215    /// Freeze one planner-owned route profile after model validation completes.
216    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
217        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
218    }
219
220    /// Freeze planner-owned executor metadata after logical/access planning completes.
221    pub(in crate::db) fn finalize_static_planning_shape_for_model(
222        &mut self,
223        model: &EntityModel,
224    ) -> Result<(), InternalError> {
225        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
226
227        Ok(())
228    }
229
230    /// Build one immutable execution-shape signature contract for runtime layers.
231    #[must_use]
232    pub(in crate::db) fn execution_shape_signature(
233        &self,
234        entity_path: &'static str,
235    ) -> ExecutionShapeSignature {
236        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
237    }
238
239    /// Return whether the chosen access contract fully satisfies the current
240    /// scalar query predicate without any additional post-access filtering.
241    #[must_use]
242    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
243        self.scalar_plan().predicate.is_some() && self.effective_execution_predicate().is_none()
244    }
245
246    /// Return whether the scalar logical predicate still requires post-access
247    /// filtering after accounting for filtered-index guard predicates and
248    /// access-path equality bounds.
249    #[must_use]
250    pub(in crate::db) fn has_residual_predicate(&self) -> bool {
251        self.scalar_plan().predicate.is_some()
252            && !self.predicate_fully_satisfied_by_access_contract()
253    }
254
255    /// Borrow the planner-frozen compiled scalar projection program.
256    #[must_use]
257    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
258        self.static_planning_shape()
259            .scalar_projection_plan
260            .as_deref()
261    }
262
263    /// Borrow the planner-frozen primary-key field name.
264    #[must_use]
265    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
266        self.static_planning_shape().primary_key_name
267    }
268
269    /// Borrow the planner-frozen projection slot reachability set.
270    #[must_use]
271    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
272        self.static_planning_shape()
273            .projection_referenced_slots
274            .as_slice()
275    }
276
277    /// Borrow the planner-frozen mask for direct projected output slots.
278    #[must_use]
279    #[cfg(any(test, feature = "diagnostics"))]
280    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
281        self.static_planning_shape().projected_slot_mask.as_slice()
282    }
283
284    /// Return whether projection remains the full model-identity field list.
285    #[must_use]
286    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
287        self.static_planning_shape().projection_is_model_identity
288    }
289
290    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
291    #[must_use]
292    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
293        self.static_planning_shape()
294            .order_referenced_slots
295            .as_deref()
296    }
297
298    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
299    #[must_use]
300    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
301        self.static_planning_shape().resolved_order.as_ref()
302    }
303
304    /// Borrow the planner-frozen access slot map used by index predicate compilation.
305    #[must_use]
306    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
307        self.static_planning_shape().slot_map.as_deref()
308    }
309
310    /// Borrow grouped aggregate execution specs already resolved during static planning.
311    #[must_use]
312    pub(in crate::db) fn grouped_aggregate_execution_specs(
313        &self,
314    ) -> Option<&[GroupedAggregateExecutionSpec]> {
315        self.static_planning_shape()
316            .grouped_aggregate_execution_specs
317            .as_deref()
318    }
319
320    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
321    #[must_use]
322    pub(in crate::db) const fn grouped_distinct_execution_strategy(
323        &self,
324    ) -> Option<&GroupedDistinctExecutionStrategy> {
325        self.static_planning_shape()
326            .grouped_distinct_execution_strategy
327            .as_ref()
328    }
329
330    /// Borrow the frozen projection semantic shape without reopening model ownership.
331    #[must_use]
332    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
333        &self.static_planning_shape().projection_spec
334    }
335
336    /// Borrow the frozen direct projection slots without reopening model ownership.
337    #[must_use]
338    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
339        self.static_planning_shape()
340            .projection_direct_slots
341            .as_deref()
342    }
343
344    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
345    #[must_use]
346    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
347        self.static_planning_shape()
348            .index_compile_targets
349            .as_deref()
350    }
351
352    const fn static_planning_shape(&self) -> &StaticPlanningShape {
353        self.static_planning_shape
354            .as_ref()
355            .expect("access-planned queries must freeze static planning shape before execution")
356    }
357}
358
359fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
360    match access {
361        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
362            Some(DistinctExecutionStrategy::PreOrdered)
363        }
364        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
365            Some(DistinctExecutionStrategy::HashMaterialize)
366        }
367        AccessPlan::Path(_) => None,
368    }
369}
370
371fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
372    let is_grouped_safe = plan
373        .grouped_plan()
374        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
375
376    ContinuationPolicy::new(
377        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
378        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
379        is_grouped_safe,
380    )
381}
382
383/// Project one planner-owned route profile from the finalized logical+access plan.
384#[must_use]
385pub(in crate::db) fn project_planner_route_profile_for_model(
386    model: &EntityModel,
387    plan: &AccessPlannedQuery,
388) -> PlannerRouteProfile {
389    let secondary_order_contract = plan
390        .scalar_plan()
391        .order
392        .as_ref()
393        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
394
395    PlannerRouteProfile::new(
396        derive_continuation_policy_validated(plan),
397        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
398        secondary_order_contract,
399    )
400}
401
402fn project_static_planning_shape_for_model(
403    model: &EntityModel,
404    plan: &AccessPlannedQuery,
405) -> Result<StaticPlanningShape, InternalError> {
406    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
407    let execution_preparation_compiled_predicate =
408        compile_optional_predicate(model, plan.execution_preparation_predicate().as_ref());
409    let effective_runtime_compiled_predicate =
410        compile_optional_predicate(model, plan.effective_execution_predicate().as_ref());
411    let scalar_projection_plan =
412        if plan.grouped_plan().is_none() {
413            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
414            InternalError::query_executor_invariant(
415                "scalar projection program must compile during static planning finalization",
416            )
417        })?)
418        } else {
419            None
420        };
421    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
422        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
423    let projection_direct_slots =
424        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
425    let projection_referenced_slots =
426        projection_referenced_slots_for_spec(model, &projection_spec)?;
427    let projected_slot_mask =
428        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
429    let projection_is_model_identity =
430        projection_is_model_identity_for_spec(model, &projection_spec);
431    let resolved_order = resolved_order_for_plan(model, plan)?;
432    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
433    let slot_map = slot_map_for_model_plan(model, plan);
434    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
435
436    Ok(StaticPlanningShape {
437        primary_key_name: model.primary_key.name,
438        projection_spec,
439        execution_preparation_compiled_predicate,
440        effective_runtime_compiled_predicate,
441        scalar_projection_plan,
442        grouped_aggregate_execution_specs,
443        grouped_distinct_execution_strategy,
444        projection_direct_slots,
445        projection_referenced_slots,
446        projected_slot_mask,
447        projection_is_model_identity,
448        resolved_order,
449        order_referenced_slots,
450        slot_map,
451        index_compile_targets,
452    })
453}
454
455// Compile one optional planner-frozen predicate program while keeping the
456// static planning assembly path free of repeated `Option` mapping boilerplate.
457fn compile_optional_predicate(
458    model: &EntityModel,
459    predicate: Option<&PredicateExecutionModel>,
460) -> Option<PredicateProgram> {
461    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
462}
463
464// Resolve the grouped-only static planning semantics bundle once so grouped
465// aggregate execution specs and grouped DISTINCT strategy stay derived under
466// one shared grouped-plan branch.
467fn resolve_grouped_static_planning_semantics(
468    model: &EntityModel,
469    plan: &AccessPlannedQuery,
470    projection_spec: &ProjectionSpec,
471) -> Result<
472    (
473        Option<Vec<GroupedAggregateExecutionSpec>>,
474        Option<GroupedDistinctExecutionStrategy>,
475    ),
476    InternalError,
477> {
478    let Some(grouped) = plan.grouped_plan() else {
479        return Ok((None, None));
480    };
481
482    let mut aggregate_projection_specs = grouped_aggregate_projection_specs_from_projection_spec(
483        projection_spec,
484        grouped.group.group_fields.as_slice(),
485        grouped.group.aggregates.as_slice(),
486    )?;
487    extend_grouped_having_aggregate_projection_specs(&mut aggregate_projection_specs, grouped)?;
488
489    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
490        model,
491        aggregate_projection_specs.as_slice(),
492    )?);
493    let grouped_distinct_execution_strategy =
494        Some(resolved_grouped_distinct_execution_strategy_for_model(
495            model,
496            grouped.group.group_fields.as_slice(),
497            grouped.group.aggregates.as_slice(),
498            grouped.having_expr.as_ref(),
499        )?);
500
501    Ok((
502        grouped_aggregate_execution_specs,
503        grouped_distinct_execution_strategy,
504    ))
505}
506
507fn extend_grouped_having_aggregate_projection_specs(
508    aggregate_projection_specs: &mut Vec<GroupedAggregateProjectionSpec>,
509    grouped: &GroupPlan,
510) -> Result<(), InternalError> {
511    if let Some(having_expr) = grouped.having_expr.as_ref() {
512        collect_grouped_having_expr_aggregate_projection_specs(
513            aggregate_projection_specs,
514            grouped,
515            having_expr,
516        )?;
517    }
518
519    Ok(())
520}
521
522fn collect_grouped_having_expr_aggregate_projection_specs(
523    aggregate_projection_specs: &mut Vec<GroupedAggregateProjectionSpec>,
524    grouped: &GroupPlan,
525    expr: &GroupHavingExpr,
526) -> Result<(), InternalError> {
527    match expr {
528        GroupHavingExpr::Compare { left, right, .. } => {
529            collect_grouped_having_value_expr_aggregate_projection_specs(
530                aggregate_projection_specs,
531                grouped,
532                left,
533            )?;
534            collect_grouped_having_value_expr_aggregate_projection_specs(
535                aggregate_projection_specs,
536                grouped,
537                right,
538            )?;
539        }
540        GroupHavingExpr::And(children) => {
541            for child in children {
542                collect_grouped_having_expr_aggregate_projection_specs(
543                    aggregate_projection_specs,
544                    grouped,
545                    child,
546                )?;
547            }
548        }
549    }
550
551    Ok(())
552}
553
554fn collect_grouped_having_value_expr_aggregate_projection_specs(
555    aggregate_projection_specs: &mut Vec<GroupedAggregateProjectionSpec>,
556    grouped: &GroupPlan,
557    expr: &GroupHavingValueExpr,
558) -> Result<(), InternalError> {
559    match expr {
560        GroupHavingValueExpr::GroupField(_) | GroupHavingValueExpr::Literal(_) => {}
561        GroupHavingValueExpr::AggregateIndex(aggregate_index) => {
562            push_grouped_having_aggregate_projection_spec(
563                aggregate_projection_specs,
564                grouped,
565                *aggregate_index,
566            )?;
567        }
568        GroupHavingValueExpr::FunctionCall { args, .. } => {
569            for arg in args {
570                collect_grouped_having_value_expr_aggregate_projection_specs(
571                    aggregate_projection_specs,
572                    grouped,
573                    arg,
574                )?;
575            }
576        }
577        GroupHavingValueExpr::Unary { expr, .. } => {
578            collect_grouped_having_value_expr_aggregate_projection_specs(
579                aggregate_projection_specs,
580                grouped,
581                expr,
582            )?;
583        }
584        GroupHavingValueExpr::Case {
585            when_then_arms,
586            else_expr,
587        } => {
588            for arm in when_then_arms {
589                collect_grouped_having_value_expr_aggregate_projection_specs(
590                    aggregate_projection_specs,
591                    grouped,
592                    arm.condition(),
593                )?;
594                collect_grouped_having_value_expr_aggregate_projection_specs(
595                    aggregate_projection_specs,
596                    grouped,
597                    arm.result(),
598                )?;
599            }
600            collect_grouped_having_value_expr_aggregate_projection_specs(
601                aggregate_projection_specs,
602                grouped,
603                else_expr,
604            )?;
605        }
606        GroupHavingValueExpr::Binary { left, right, .. } => {
607            collect_grouped_having_value_expr_aggregate_projection_specs(
608                aggregate_projection_specs,
609                grouped,
610                left,
611            )?;
612            collect_grouped_having_value_expr_aggregate_projection_specs(
613                aggregate_projection_specs,
614                grouped,
615                right,
616            )?;
617        }
618    }
619
620    Ok(())
621}
622
623fn push_grouped_having_aggregate_projection_spec(
624    aggregate_projection_specs: &mut Vec<GroupedAggregateProjectionSpec>,
625    grouped: &GroupPlan,
626    aggregate_index: usize,
627) -> Result<(), InternalError> {
628    let Some(aggregate) = grouped.group.aggregates.get(aggregate_index) else {
629        return Err(InternalError::planner_executor_invariant(format!(
630            "grouped static planning semantics referenced HAVING aggregate index {aggregate_index} but aggregate_count={}",
631            grouped.group.aggregates.len(),
632        )));
633    };
634    let aggregate_projection_spec =
635        GroupedAggregateProjectionSpec::from_group_aggregate_spec(aggregate);
636
637    if aggregate_projection_specs
638        .iter()
639        .all(|current| current != &aggregate_projection_spec)
640    {
641        aggregate_projection_specs.push(aggregate_projection_spec);
642    }
643
644    Ok(())
645}
646
647fn projection_referenced_slots_for_spec(
648    model: &EntityModel,
649    projection: &ProjectionSpec,
650) -> Result<Vec<usize>, InternalError> {
651    let mut referenced = vec![false; model.fields().len()];
652
653    for field in projection.fields() {
654        mark_projection_expr_slots(
655            model,
656            projection_field_expr(field),
657            referenced.as_mut_slice(),
658        )?;
659    }
660
661    Ok(referenced
662        .into_iter()
663        .enumerate()
664        .filter_map(|(slot, required)| required.then_some(slot))
665        .collect())
666}
667
668fn mark_projection_expr_slots(
669    model: &EntityModel,
670    expr: &Expr,
671    referenced: &mut [bool],
672) -> Result<(), InternalError> {
673    match expr {
674        Expr::Field(field_id) => {
675            let field_name = field_id.as_str();
676            let slot = resolve_required_field_slot(model, field_name, || {
677                InternalError::query_invalid_logical_plan(format!(
678                    "projection expression references unknown field '{field_name}'",
679                ))
680            })?;
681            referenced[slot] = true;
682        }
683        Expr::Literal(_) => {}
684        Expr::FunctionCall { args, .. } => {
685            for arg in args {
686                mark_projection_expr_slots(model, arg, referenced)?;
687            }
688        }
689        Expr::Case {
690            when_then_arms,
691            else_expr,
692        } => {
693            for arm in when_then_arms {
694                mark_projection_expr_slots(model, arm.condition(), referenced)?;
695                mark_projection_expr_slots(model, arm.result(), referenced)?;
696            }
697            mark_projection_expr_slots(model, else_expr.as_ref(), referenced)?;
698        }
699        Expr::Aggregate(_) => {}
700        #[cfg(test)]
701        Expr::Alias { expr, .. } => {
702            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
703        }
704        Expr::Unary { expr, .. } => {
705            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
706        }
707        Expr::Binary { left, right, .. } => {
708            mark_projection_expr_slots(model, left.as_ref(), referenced)?;
709            mark_projection_expr_slots(model, right.as_ref(), referenced)?;
710        }
711    }
712
713    Ok(())
714}
715
716fn projected_slot_mask_for_spec(
717    model: &EntityModel,
718    direct_projection_slots: Option<&[usize]>,
719) -> Vec<bool> {
720    let mut projected_slots = vec![false; model.fields().len()];
721
722    let Some(direct_projection_slots) = direct_projection_slots else {
723        return projected_slots;
724    };
725
726    for slot in direct_projection_slots.iter().copied() {
727        if let Some(projected) = projected_slots.get_mut(slot) {
728            *projected = true;
729        }
730    }
731
732    projected_slots
733}
734
735fn projection_is_model_identity_for_spec(model: &EntityModel, projection: &ProjectionSpec) -> bool {
736    if projection.len() != model.fields().len() {
737        return false;
738    }
739
740    for (field_model, projected_field) in model.fields().iter().zip(projection.fields()) {
741        match projected_field {
742            ProjectionField::Scalar {
743                expr: Expr::Field(field_id),
744                alias: None,
745            } if field_id.as_str() == field_model.name() => {}
746            ProjectionField::Scalar { .. } => return false,
747        }
748    }
749
750    true
751}
752
753fn resolved_order_for_plan(
754    model: &EntityModel,
755    plan: &AccessPlannedQuery,
756) -> Result<Option<ResolvedOrder>, InternalError> {
757    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
758        return Ok(None);
759    }
760
761    let Some(order) = plan.scalar_plan().order.as_ref() else {
762        return Ok(None);
763    };
764
765    let mut fields = Vec::with_capacity(order.fields.len());
766    for (field, direction) in &order.fields {
767        fields.push(ResolvedOrderField::new(
768            resolved_order_value_source_for_field(model, field)?,
769            *direction,
770        ));
771    }
772
773    Ok(Some(ResolvedOrder::new(fields)))
774}
775
776fn resolved_order_value_source_for_field(
777    model: &EntityModel,
778    field: &str,
779) -> Result<ResolvedOrderValueSource, InternalError> {
780    if let Some(expr) = parse_supported_computed_order_expr(field) {
781        validate_resolved_order_expr_fields(model, &expr, field)?;
782        let compiled = compile_scalar_projection_expr(model, &expr)
783            .ok_or_else(|| order_expression_scalar_seam_error(field))?;
784
785        return Ok(ResolvedOrderValueSource::expression(compiled));
786    }
787
788    let slot = resolve_required_field_slot(model, field, || {
789        InternalError::query_invalid_logical_plan(format!(
790            "order expression references unknown field '{field}'",
791        ))
792    })?;
793
794    Ok(ResolvedOrderValueSource::direct_field(slot))
795}
796
797fn validate_resolved_order_expr_fields(
798    model: &EntityModel,
799    expr: &Expr,
800    rendered: &str,
801) -> Result<(), InternalError> {
802    match expr {
803        Expr::Field(field_id) => {
804            resolve_required_field_slot(model, field_id.as_str(), || {
805                InternalError::query_invalid_logical_plan(format!(
806                    "order expression references unknown field '{rendered}'",
807                ))
808            })?;
809        }
810        Expr::Literal(_) => {}
811        Expr::FunctionCall { args, .. } => {
812            for arg in args {
813                validate_resolved_order_expr_fields(model, arg, rendered)?;
814            }
815        }
816        Expr::Case {
817            when_then_arms,
818            else_expr,
819        } => {
820            for arm in when_then_arms {
821                validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
822                validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
823            }
824            validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
825        }
826        Expr::Binary { left, right, .. } => {
827            validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
828            validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
829        }
830        Expr::Aggregate(_) => {
831            return Err(order_expression_scalar_seam_error(rendered));
832        }
833        #[cfg(test)]
834        Expr::Alias { .. } => {
835            return Err(order_expression_scalar_seam_error(rendered));
836        }
837        Expr::Unary { .. } => {
838            return Err(order_expression_scalar_seam_error(rendered));
839        }
840    }
841
842    Ok(())
843}
844
845// Resolve one model field slot while keeping planner invalid-logical-plan
846// error construction at the callsite that owns the diagnostic wording.
847fn resolve_required_field_slot<F>(
848    model: &EntityModel,
849    field: &str,
850    invalid_plan_error: F,
851) -> Result<usize, InternalError>
852where
853    F: FnOnce() -> InternalError,
854{
855    resolve_field_slot(model, field).ok_or_else(invalid_plan_error)
856}
857
858// Keep the scalar-order expression seam violation text under one helper so the
859// parse validation and compile validation paths do not drift.
860fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
861    InternalError::query_invalid_logical_plan(format!(
862        "order expression '{rendered}' did not stay on the scalar expression seam",
863    ))
864}
865
866// Keep one stable executor-facing slot list for grouped order terms after the
867// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
868// now consumes this same referenced-slot contract instead of re-deriving order
869// sources from planner strategy at runtime.
870fn order_referenced_slots_for_resolved_order(
871    resolved_order: Option<&ResolvedOrder>,
872) -> Option<Vec<usize>> {
873    let resolved_order = resolved_order?;
874    let mut referenced = Vec::new();
875
876    // Keep one stable slot list without re-parsing order expressions after the
877    // planner has already frozen structural ORDER BY sources.
878    for field in resolved_order.fields() {
879        field.source().extend_referenced_slots(&mut referenced);
880    }
881
882    Some(referenced)
883}
884
885fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
886    let access_strategy = plan.access.resolve_strategy();
887    let executable = access_strategy.executable();
888
889    resolved_index_slots_for_access_path(model, executable)
890}
891
892fn resolved_index_slots_for_access_path(
893    model: &EntityModel,
894    access: &ExecutableAccessPlan<'_, crate::value::Value>,
895) -> Option<Vec<usize>> {
896    let path = access.as_path()?;
897    let path_capabilities = path.capabilities();
898    let index_fields = path_capabilities.index_fields_for_slot_map()?;
899    let mut slots = Vec::with_capacity(index_fields.len());
900
901    for field_name in index_fields {
902        let slot = resolve_field_slot(model, field_name)?;
903        slots.push(slot);
904    }
905
906    Some(slots)
907}
908
909fn index_compile_targets_for_model_plan(
910    model: &EntityModel,
911    plan: &AccessPlannedQuery,
912) -> Option<Vec<IndexCompileTarget>> {
913    let index = plan.access.as_path()?.selected_index_model()?;
914    let mut targets = Vec::new();
915
916    match index.key_items() {
917        IndexKeyItemsRef::Fields(fields) => {
918            for (component_index, &field_name) in fields.iter().enumerate() {
919                let field_slot = resolve_field_slot(model, field_name)?;
920                targets.push(IndexCompileTarget {
921                    component_index,
922                    field_slot,
923                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
924                });
925            }
926        }
927        IndexKeyItemsRef::Items(items) => {
928            for (component_index, &key_item) in items.iter().enumerate() {
929                let field_slot = resolve_field_slot(model, key_item.field())?;
930                targets.push(IndexCompileTarget {
931                    component_index,
932                    field_slot,
933                    key_item,
934                });
935            }
936        }
937    }
938
939    Some(targets)
940}