Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::IndexCompileTarget,
10        predicate::{PredicateExecutionModel, PredicateProgram},
11        query::plan::{
12            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
13            ExecutionShapeSignature, GroupPlan, GroupedAggregateExecutionSpec,
14            GroupedDistinctExecutionStrategy, GroupedPlanStrategy, LogicalPlan,
15            PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
16            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
17            derive_logical_pushdown_eligibility,
18            expr::{
19                Expr, ProjectionField, ProjectionSpec, ScalarProjectionExpr,
20                compile_scalar_projection_expr, compile_scalar_projection_plan,
21                parse_supported_computed_order_expr, projection_field_expr,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
25            lower_projection_identity, lower_projection_intent,
26            residual_query_predicate_after_access_path_bounds,
27            residual_query_predicate_after_filtered_access,
28            resolved_grouped_distinct_execution_strategy_for_model,
29        },
30    },
31    error::InternalError,
32    model::{
33        entity::{EntityModel, resolve_field_slot},
34        index::IndexKeyItemsRef,
35    },
36};
37
38impl QueryMode {
39    /// True if this mode represents a load intent.
40    #[must_use]
41    pub const fn is_load(&self) -> bool {
42        match self {
43            Self::Load(_) => true,
44            Self::Delete(_) => false,
45        }
46    }
47
48    /// True if this mode represents a delete intent.
49    #[must_use]
50    pub const fn is_delete(&self) -> bool {
51        match self {
52            Self::Delete(_) => true,
53            Self::Load(_) => false,
54        }
55    }
56}
57
58impl LogicalPlan {
59    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
60    #[must_use]
61    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
62        match self {
63            Self::Scalar(plan) => plan,
64            Self::Grouped(plan) => &plan.scalar,
65        }
66    }
67
68    /// Borrow scalar semantic fields mutably across logical variants for tests.
69    #[must_use]
70    #[cfg(test)]
71    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
72        match self {
73            Self::Scalar(plan) => plan,
74            Self::Grouped(plan) => &mut plan.scalar,
75        }
76    }
77
78    /// Test-only shorthand for explicit scalar semantic borrowing.
79    #[must_use]
80    #[cfg(test)]
81    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
82        self.scalar_semantics()
83    }
84
85    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
86    #[must_use]
87    #[cfg(test)]
88    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
89        self.scalar_semantics_mut()
90    }
91}
92
93impl AccessPlannedQuery {
94    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
95    #[must_use]
96    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
97        self.logical.scalar_semantics()
98    }
99
100    /// Borrow scalar semantic fields mutably across logical variants for tests.
101    #[must_use]
102    #[cfg(test)]
103    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
104        self.logical.scalar_semantics_mut()
105    }
106
107    /// Test-only shorthand for explicit scalar plan borrowing.
108    #[must_use]
109    #[cfg(test)]
110    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
111        self.scalar_plan()
112    }
113
114    /// Test-only shorthand for explicit mutable scalar plan borrowing.
115    #[must_use]
116    #[cfg(test)]
117    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
118        self.scalar_plan_mut()
119    }
120
121    /// Borrow grouped semantic fields when this plan is grouped.
122    #[must_use]
123    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
124        match &self.logical {
125            LogicalPlan::Scalar(_) => None,
126            LogicalPlan::Grouped(plan) => Some(plan),
127        }
128    }
129
130    /// Lower this plan into one canonical planner-owned projection semantic spec.
131    #[must_use]
132    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
133        if let Some(static_shape) = &self.static_planning_shape {
134            return static_shape.projection_spec.clone();
135        }
136
137        lower_projection_intent(model, &self.logical, &self.projection_selection)
138    }
139
140    /// Lower this plan into one projection semantic shape for identity hashing.
141    #[must_use]
142    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
143        lower_projection_identity(&self.logical)
144    }
145
146    /// Return the executor-facing predicate after removing only filtered-index
147    /// guard clauses the chosen access path already proves.
148    ///
149    /// This conservative form is used by preparation/explain surfaces that
150    /// still need to see access-bound equalities as index-predicate input.
151    #[must_use]
152    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<PredicateExecutionModel> {
153        let query_predicate = self.scalar_plan().predicate.as_ref()?;
154
155        match self.access.selected_index_model() {
156            Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
157            None => Some(query_predicate.clone()),
158        }
159    }
160
161    /// Return the executor-facing residual predicate after removing any
162    /// filtered-index guard clauses and fixed access-bound equalities already
163    /// guaranteed by the chosen path.
164    #[must_use]
165    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<PredicateExecutionModel> {
166        // Phase 1: strip only filtered-index guard clauses the chosen access
167        // path already proves.
168        let filtered_residual = self.execution_preparation_predicate();
169        let filtered_residual = filtered_residual.as_ref()?;
170
171        // Phase 2: strip any additional equality clauses already guaranteed by
172        // the concrete access-path bounds, such as `tier = 'gold'` on one
173        // selected `IndexPrefix(tier='gold', ...)` route.
174        residual_query_predicate_after_access_path_bounds(self.access.as_path(), filtered_residual)
175    }
176
177    /// Borrow the planner-compiled execution-preparation predicate program.
178    #[must_use]
179    pub(in crate::db) const fn execution_preparation_compiled_predicate(
180        &self,
181    ) -> Option<&PredicateProgram> {
182        self.static_planning_shape()
183            .execution_preparation_compiled_predicate
184            .as_ref()
185    }
186
187    /// Borrow the planner-compiled effective runtime predicate program.
188    #[must_use]
189    pub(in crate::db) const fn effective_runtime_compiled_predicate(
190        &self,
191    ) -> Option<&PredicateProgram> {
192        self.static_planning_shape()
193            .effective_runtime_compiled_predicate
194            .as_ref()
195    }
196
197    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
198    #[must_use]
199    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
200        if !self.scalar_plan().distinct {
201            return DistinctExecutionStrategy::None;
202        }
203
204        // DISTINCT on duplicate-safe single-path access shapes is a planner
205        // no-op for runtime dedup mechanics. Composite shapes can surface
206        // duplicate keys and therefore retain explicit dedup execution.
207        match distinct_runtime_dedup_strategy(&self.access) {
208            Some(strategy) => strategy,
209            None => DistinctExecutionStrategy::None,
210        }
211    }
212
213    /// Freeze one planner-owned route profile after model validation completes.
214    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
215        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
216    }
217
218    /// Freeze planner-owned executor metadata after logical/access planning completes.
219    pub(in crate::db) fn finalize_static_planning_shape_for_model(
220        &mut self,
221        model: &EntityModel,
222    ) -> Result<(), InternalError> {
223        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
224
225        Ok(())
226    }
227
228    /// Build one immutable execution-shape signature contract for runtime layers.
229    #[must_use]
230    pub(in crate::db) fn execution_shape_signature(
231        &self,
232        entity_path: &'static str,
233    ) -> ExecutionShapeSignature {
234        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
235    }
236
237    /// Return whether the chosen access contract fully satisfies the current
238    /// scalar query predicate without any additional post-access filtering.
239    #[must_use]
240    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
241        self.scalar_plan().predicate.is_some() && self.effective_execution_predicate().is_none()
242    }
243
244    /// Return whether the scalar logical predicate still requires post-access
245    /// filtering after accounting for filtered-index guard predicates and
246    /// access-path equality bounds.
247    #[must_use]
248    pub(in crate::db) fn has_residual_predicate(&self) -> bool {
249        self.scalar_plan().predicate.is_some()
250            && !self.predicate_fully_satisfied_by_access_contract()
251    }
252
253    /// Borrow the planner-frozen compiled scalar projection program.
254    #[must_use]
255    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
256        self.static_planning_shape()
257            .scalar_projection_plan
258            .as_deref()
259    }
260
261    /// Borrow the planner-frozen primary-key field name.
262    #[must_use]
263    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
264        self.static_planning_shape().primary_key_name
265    }
266
267    /// Borrow the planner-frozen projection slot reachability set.
268    #[must_use]
269    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
270        self.static_planning_shape()
271            .projection_referenced_slots
272            .as_slice()
273    }
274
275    /// Borrow the planner-frozen mask for direct projected output slots.
276    #[must_use]
277    #[cfg(any(test, feature = "diagnostics"))]
278    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
279        self.static_planning_shape().projected_slot_mask.as_slice()
280    }
281
282    /// Return whether projection remains the full model-identity field list.
283    #[must_use]
284    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
285        self.static_planning_shape().projection_is_model_identity
286    }
287
288    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
289    #[must_use]
290    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
291        self.static_planning_shape()
292            .order_referenced_slots
293            .as_deref()
294    }
295
296    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
297    #[must_use]
298    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
299        self.static_planning_shape().resolved_order.as_ref()
300    }
301
302    /// Borrow the planner-frozen access slot map used by index predicate compilation.
303    #[must_use]
304    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
305        self.static_planning_shape().slot_map.as_deref()
306    }
307
308    /// Borrow grouped aggregate execution specs already resolved during static planning.
309    #[must_use]
310    pub(in crate::db) fn grouped_aggregate_execution_specs(
311        &self,
312    ) -> Option<&[GroupedAggregateExecutionSpec]> {
313        self.static_planning_shape()
314            .grouped_aggregate_execution_specs
315            .as_deref()
316    }
317
318    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
319    #[must_use]
320    pub(in crate::db) const fn grouped_distinct_execution_strategy(
321        &self,
322    ) -> Option<&GroupedDistinctExecutionStrategy> {
323        self.static_planning_shape()
324            .grouped_distinct_execution_strategy
325            .as_ref()
326    }
327
328    /// Borrow the frozen projection semantic shape without reopening model ownership.
329    #[must_use]
330    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
331        &self.static_planning_shape().projection_spec
332    }
333
334    /// Borrow the frozen direct projection slots without reopening model ownership.
335    #[must_use]
336    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
337        self.static_planning_shape()
338            .projection_direct_slots
339            .as_deref()
340    }
341
342    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
343    #[must_use]
344    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
345        self.static_planning_shape()
346            .index_compile_targets
347            .as_deref()
348    }
349
350    const fn static_planning_shape(&self) -> &StaticPlanningShape {
351        self.static_planning_shape
352            .as_ref()
353            .expect("access-planned queries must freeze static planning shape before execution")
354    }
355}
356
357fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
358    match access {
359        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
360            Some(DistinctExecutionStrategy::PreOrdered)
361        }
362        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
363            Some(DistinctExecutionStrategy::HashMaterialize)
364        }
365        AccessPlan::Path(_) => None,
366    }
367}
368
369fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
370    let is_grouped_safe = plan
371        .grouped_plan()
372        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
373
374    ContinuationPolicy::new(
375        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
376        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
377        is_grouped_safe,
378    )
379}
380
381/// Project one planner-owned route profile from the finalized logical+access plan.
382#[must_use]
383pub(in crate::db) fn project_planner_route_profile_for_model(
384    model: &EntityModel,
385    plan: &AccessPlannedQuery,
386) -> PlannerRouteProfile {
387    let secondary_order_contract = plan
388        .scalar_plan()
389        .order
390        .as_ref()
391        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
392
393    PlannerRouteProfile::new(
394        derive_continuation_policy_validated(plan),
395        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
396        secondary_order_contract,
397    )
398}
399
400fn project_static_planning_shape_for_model(
401    model: &EntityModel,
402    plan: &AccessPlannedQuery,
403) -> Result<StaticPlanningShape, InternalError> {
404    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
405    let execution_preparation_compiled_predicate =
406        compile_optional_predicate(model, plan.execution_preparation_predicate().as_ref());
407    let effective_runtime_compiled_predicate =
408        compile_optional_predicate(model, plan.effective_execution_predicate().as_ref());
409    let scalar_projection_plan =
410        if plan.grouped_plan().is_none() {
411            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
412            InternalError::query_executor_invariant(
413                "scalar projection program must compile during static planning finalization",
414            )
415        })?)
416        } else {
417            None
418        };
419    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
420        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
421    let projection_direct_slots =
422        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
423    let projection_referenced_slots =
424        projection_referenced_slots_for_spec(model, &projection_spec)?;
425    let projected_slot_mask =
426        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
427    let projection_is_model_identity =
428        projection_is_model_identity_for_spec(model, &projection_spec);
429    let resolved_order = resolved_order_for_plan(model, plan)?;
430    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
431    let slot_map = slot_map_for_model_plan(model, plan);
432    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
433
434    Ok(StaticPlanningShape {
435        primary_key_name: model.primary_key.name,
436        projection_spec,
437        execution_preparation_compiled_predicate,
438        effective_runtime_compiled_predicate,
439        scalar_projection_plan,
440        grouped_aggregate_execution_specs,
441        grouped_distinct_execution_strategy,
442        projection_direct_slots,
443        projection_referenced_slots,
444        projected_slot_mask,
445        projection_is_model_identity,
446        resolved_order,
447        order_referenced_slots,
448        slot_map,
449        index_compile_targets,
450    })
451}
452
453// Compile one optional planner-frozen predicate program while keeping the
454// static planning assembly path free of repeated `Option` mapping boilerplate.
455fn compile_optional_predicate(
456    model: &EntityModel,
457    predicate: Option<&PredicateExecutionModel>,
458) -> Option<PredicateProgram> {
459    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
460}
461
462// Resolve the grouped-only static planning semantics bundle once so grouped
463// aggregate execution specs and grouped DISTINCT strategy stay derived under
464// one shared grouped-plan branch.
465fn resolve_grouped_static_planning_semantics(
466    model: &EntityModel,
467    plan: &AccessPlannedQuery,
468    projection_spec: &ProjectionSpec,
469) -> Result<
470    (
471        Option<Vec<GroupedAggregateExecutionSpec>>,
472        Option<GroupedDistinctExecutionStrategy>,
473    ),
474    InternalError,
475> {
476    let Some(grouped) = plan.grouped_plan() else {
477        return Ok((None, None));
478    };
479
480    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
481        projection_spec,
482        grouped.group.group_fields.as_slice(),
483        grouped.group.aggregates.as_slice(),
484    )?;
485    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
486
487    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
488        model,
489        aggregate_specs.as_slice(),
490    )?);
491    let grouped_distinct_execution_strategy =
492        Some(resolved_grouped_distinct_execution_strategy_for_model(
493            model,
494            grouped.group.group_fields.as_slice(),
495            grouped.group.aggregates.as_slice(),
496            grouped.having_expr.as_ref(),
497        )?);
498
499    Ok((
500        grouped_aggregate_execution_specs,
501        grouped_distinct_execution_strategy,
502    ))
503}
504
505fn extend_grouped_having_aggregate_specs(
506    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
507    grouped: &GroupPlan,
508) -> Result<(), InternalError> {
509    if let Some(having_expr) = grouped.having_expr.as_ref() {
510        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
511    }
512
513    Ok(())
514}
515
516fn collect_grouped_having_expr_aggregate_specs(
517    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
518    expr: &Expr,
519) -> Result<(), InternalError> {
520    match expr {
521        Expr::Aggregate(aggregate_expr) => {
522            let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
523
524            if aggregate_specs
525                .iter()
526                .all(|current| current != &aggregate_spec)
527            {
528                aggregate_specs.push(aggregate_spec);
529            }
530        }
531        Expr::Field(_) | Expr::Literal(_) => {}
532        Expr::FunctionCall { args, .. } => {
533            for arg in args {
534                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arg)?;
535            }
536        }
537        Expr::Unary { expr, .. } => {
538            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
539        }
540        Expr::Case {
541            when_then_arms,
542            else_expr,
543        } => {
544            for arm in when_then_arms {
545                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.condition())?;
546                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.result())?;
547            }
548
549            collect_grouped_having_expr_aggregate_specs(aggregate_specs, else_expr)?;
550        }
551        Expr::Binary { left, right, .. } => {
552            collect_grouped_having_expr_aggregate_specs(aggregate_specs, left)?;
553            collect_grouped_having_expr_aggregate_specs(aggregate_specs, right)?;
554        }
555        #[cfg(test)]
556        Expr::Alias { expr, .. } => {
557            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
558        }
559    }
560
561    Ok(())
562}
563
564fn projection_referenced_slots_for_spec(
565    model: &EntityModel,
566    projection: &ProjectionSpec,
567) -> Result<Vec<usize>, InternalError> {
568    let mut referenced = vec![false; model.fields().len()];
569
570    for field in projection.fields() {
571        mark_projection_expr_slots(
572            model,
573            projection_field_expr(field),
574            referenced.as_mut_slice(),
575        )?;
576    }
577
578    Ok(referenced
579        .into_iter()
580        .enumerate()
581        .filter_map(|(slot, required)| required.then_some(slot))
582        .collect())
583}
584
585fn mark_projection_expr_slots(
586    model: &EntityModel,
587    expr: &Expr,
588    referenced: &mut [bool],
589) -> Result<(), InternalError> {
590    match expr {
591        Expr::Field(field_id) => {
592            let field_name = field_id.as_str();
593            let slot = resolve_required_field_slot(model, field_name, || {
594                InternalError::query_invalid_logical_plan(format!(
595                    "projection expression references unknown field '{field_name}'",
596                ))
597            })?;
598            referenced[slot] = true;
599        }
600        Expr::Literal(_) => {}
601        Expr::FunctionCall { args, .. } => {
602            for arg in args {
603                mark_projection_expr_slots(model, arg, referenced)?;
604            }
605        }
606        Expr::Case {
607            when_then_arms,
608            else_expr,
609        } => {
610            for arm in when_then_arms {
611                mark_projection_expr_slots(model, arm.condition(), referenced)?;
612                mark_projection_expr_slots(model, arm.result(), referenced)?;
613            }
614            mark_projection_expr_slots(model, else_expr.as_ref(), referenced)?;
615        }
616        Expr::Aggregate(_) => {}
617        #[cfg(test)]
618        Expr::Alias { expr, .. } => {
619            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
620        }
621        Expr::Unary { expr, .. } => {
622            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
623        }
624        Expr::Binary { left, right, .. } => {
625            mark_projection_expr_slots(model, left.as_ref(), referenced)?;
626            mark_projection_expr_slots(model, right.as_ref(), referenced)?;
627        }
628    }
629
630    Ok(())
631}
632
633fn projected_slot_mask_for_spec(
634    model: &EntityModel,
635    direct_projection_slots: Option<&[usize]>,
636) -> Vec<bool> {
637    let mut projected_slots = vec![false; model.fields().len()];
638
639    let Some(direct_projection_slots) = direct_projection_slots else {
640        return projected_slots;
641    };
642
643    for slot in direct_projection_slots.iter().copied() {
644        if let Some(projected) = projected_slots.get_mut(slot) {
645            *projected = true;
646        }
647    }
648
649    projected_slots
650}
651
652fn projection_is_model_identity_for_spec(model: &EntityModel, projection: &ProjectionSpec) -> bool {
653    if projection.len() != model.fields().len() {
654        return false;
655    }
656
657    for (field_model, projected_field) in model.fields().iter().zip(projection.fields()) {
658        match projected_field {
659            ProjectionField::Scalar {
660                expr: Expr::Field(field_id),
661                alias: None,
662            } if field_id.as_str() == field_model.name() => {}
663            ProjectionField::Scalar { .. } => return false,
664        }
665    }
666
667    true
668}
669
670fn resolved_order_for_plan(
671    model: &EntityModel,
672    plan: &AccessPlannedQuery,
673) -> Result<Option<ResolvedOrder>, InternalError> {
674    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
675        return Ok(None);
676    }
677
678    let Some(order) = plan.scalar_plan().order.as_ref() else {
679        return Ok(None);
680    };
681
682    let mut fields = Vec::with_capacity(order.fields.len());
683    for (field, direction) in &order.fields {
684        fields.push(ResolvedOrderField::new(
685            resolved_order_value_source_for_field(model, field)?,
686            *direction,
687        ));
688    }
689
690    Ok(Some(ResolvedOrder::new(fields)))
691}
692
693fn resolved_order_value_source_for_field(
694    model: &EntityModel,
695    field: &str,
696) -> Result<ResolvedOrderValueSource, InternalError> {
697    if let Some(expr) = parse_supported_computed_order_expr(field) {
698        validate_resolved_order_expr_fields(model, &expr, field)?;
699        let compiled = compile_scalar_projection_expr(model, &expr)
700            .ok_or_else(|| order_expression_scalar_seam_error(field))?;
701
702        return Ok(ResolvedOrderValueSource::expression(compiled));
703    }
704
705    let slot = resolve_required_field_slot(model, field, || {
706        InternalError::query_invalid_logical_plan(format!(
707            "order expression references unknown field '{field}'",
708        ))
709    })?;
710
711    Ok(ResolvedOrderValueSource::direct_field(slot))
712}
713
714fn validate_resolved_order_expr_fields(
715    model: &EntityModel,
716    expr: &Expr,
717    rendered: &str,
718) -> Result<(), InternalError> {
719    match expr {
720        Expr::Field(field_id) => {
721            resolve_required_field_slot(model, field_id.as_str(), || {
722                InternalError::query_invalid_logical_plan(format!(
723                    "order expression references unknown field '{rendered}'",
724                ))
725            })?;
726        }
727        Expr::Literal(_) => {}
728        Expr::FunctionCall { args, .. } => {
729            for arg in args {
730                validate_resolved_order_expr_fields(model, arg, rendered)?;
731            }
732        }
733        Expr::Case {
734            when_then_arms,
735            else_expr,
736        } => {
737            for arm in when_then_arms {
738                validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
739                validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
740            }
741            validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
742        }
743        Expr::Binary { left, right, .. } => {
744            validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
745            validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
746        }
747        Expr::Aggregate(_) => {
748            return Err(order_expression_scalar_seam_error(rendered));
749        }
750        #[cfg(test)]
751        Expr::Alias { .. } => {
752            return Err(order_expression_scalar_seam_error(rendered));
753        }
754        Expr::Unary { .. } => {
755            return Err(order_expression_scalar_seam_error(rendered));
756        }
757    }
758
759    Ok(())
760}
761
762// Resolve one model field slot while keeping planner invalid-logical-plan
763// error construction at the callsite that owns the diagnostic wording.
764fn resolve_required_field_slot<F>(
765    model: &EntityModel,
766    field: &str,
767    invalid_plan_error: F,
768) -> Result<usize, InternalError>
769where
770    F: FnOnce() -> InternalError,
771{
772    resolve_field_slot(model, field).ok_or_else(invalid_plan_error)
773}
774
775// Keep the scalar-order expression seam violation text under one helper so the
776// parse validation and compile validation paths do not drift.
777fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
778    InternalError::query_invalid_logical_plan(format!(
779        "order expression '{rendered}' did not stay on the scalar expression seam",
780    ))
781}
782
783// Keep one stable executor-facing slot list for grouped order terms after the
784// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
785// now consumes this same referenced-slot contract instead of re-deriving order
786// sources from planner strategy at runtime.
787fn order_referenced_slots_for_resolved_order(
788    resolved_order: Option<&ResolvedOrder>,
789) -> Option<Vec<usize>> {
790    let resolved_order = resolved_order?;
791    let mut referenced = Vec::new();
792
793    // Keep one stable slot list without re-parsing order expressions after the
794    // planner has already frozen structural ORDER BY sources.
795    for field in resolved_order.fields() {
796        field.source().extend_referenced_slots(&mut referenced);
797    }
798
799    Some(referenced)
800}
801
802fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
803    let access_strategy = plan.access.resolve_strategy();
804    let executable = access_strategy.executable();
805
806    resolved_index_slots_for_access_path(model, executable)
807}
808
809fn resolved_index_slots_for_access_path(
810    model: &EntityModel,
811    access: &ExecutableAccessPlan<'_, crate::value::Value>,
812) -> Option<Vec<usize>> {
813    let path = access.as_path()?;
814    let path_capabilities = path.capabilities();
815    let index_fields = path_capabilities.index_fields_for_slot_map()?;
816    let mut slots = Vec::with_capacity(index_fields.len());
817
818    for field_name in index_fields {
819        let slot = resolve_field_slot(model, field_name)?;
820        slots.push(slot);
821    }
822
823    Some(slots)
824}
825
826fn index_compile_targets_for_model_plan(
827    model: &EntityModel,
828    plan: &AccessPlannedQuery,
829) -> Option<Vec<IndexCompileTarget>> {
830    let index = plan.access.as_path()?.selected_index_model()?;
831    let mut targets = Vec::new();
832
833    match index.key_items() {
834        IndexKeyItemsRef::Fields(fields) => {
835            for (component_index, &field_name) in fields.iter().enumerate() {
836                let field_slot = resolve_field_slot(model, field_name)?;
837                targets.push(IndexCompileTarget {
838                    component_index,
839                    field_slot,
840                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
841                });
842            }
843        }
844        IndexKeyItemsRef::Items(items) => {
845            for (component_index, &key_item) in items.iter().enumerate() {
846                let field_slot = resolve_field_slot(model, key_item.field())?;
847                targets.push(IndexCompileTarget {
848                    component_index,
849                    field_slot,
850                    key_item,
851                });
852            }
853        }
854    }
855
856    Some(targets)
857}