Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::{IndexCompileTarget, Predicate, PredicateProgram, normalize_enum_literals},
10        query::plan::{
11            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16            derive_logical_pushdown_eligibility,
17            expr::{
18                Expr, ProjectionField, ProjectionSpec, ScalarProjectionExpr,
19                canonicalize_runtime_predicate_via_bool_expr, compile_scalar_projection_expr,
20                compile_scalar_projection_plan, derive_normalized_bool_expr_predicate_subset,
21                normalize_bool_expr, projection_field_expr,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
25            lower_projection_identity, lower_projection_intent,
26            residual_query_predicate_after_access_path_bounds,
27            residual_query_predicate_after_filtered_access,
28            resolved_grouped_distinct_execution_strategy_for_model,
29        },
30        schema::SchemaInfo,
31    },
32    error::InternalError,
33    model::{
34        entity::{EntityModel, resolve_field_slot},
35        index::IndexKeyItemsRef,
36    },
37};
38
39impl QueryMode {
40    /// True if this mode represents a load intent.
41    #[must_use]
42    pub const fn is_load(&self) -> bool {
43        match self {
44            Self::Load(_) => true,
45            Self::Delete(_) => false,
46        }
47    }
48
49    /// True if this mode represents a delete intent.
50    #[must_use]
51    pub const fn is_delete(&self) -> bool {
52        match self {
53            Self::Delete(_) => true,
54            Self::Load(_) => false,
55        }
56    }
57}
58
59impl LogicalPlan {
60    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
61    #[must_use]
62    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
63        match self {
64            Self::Scalar(plan) => plan,
65            Self::Grouped(plan) => &plan.scalar,
66        }
67    }
68
69    /// Borrow scalar semantic fields mutably across logical variants for tests.
70    #[must_use]
71    #[cfg(test)]
72    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
73        match self {
74            Self::Scalar(plan) => plan,
75            Self::Grouped(plan) => &mut plan.scalar,
76        }
77    }
78
79    /// Test-only shorthand for explicit scalar semantic borrowing.
80    #[must_use]
81    #[cfg(test)]
82    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
83        self.scalar_semantics()
84    }
85
86    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
87    #[must_use]
88    #[cfg(test)]
89    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
90        self.scalar_semantics_mut()
91    }
92}
93
94impl AccessPlannedQuery {
95    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
96    #[must_use]
97    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
98        self.logical.scalar_semantics()
99    }
100
101    /// Borrow scalar semantic fields mutably across logical variants for tests.
102    #[must_use]
103    #[cfg(test)]
104    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
105        self.logical.scalar_semantics_mut()
106    }
107
108    /// Test-only shorthand for explicit scalar plan borrowing.
109    #[must_use]
110    #[cfg(test)]
111    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
112        self.scalar_plan()
113    }
114
115    /// Test-only shorthand for explicit mutable scalar plan borrowing.
116    #[must_use]
117    #[cfg(test)]
118    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
119        self.scalar_plan_mut()
120    }
121
122    /// Borrow grouped semantic fields when this plan is grouped.
123    #[must_use]
124    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
125        match &self.logical {
126            LogicalPlan::Scalar(_) => None,
127            LogicalPlan::Grouped(plan) => Some(plan),
128        }
129    }
130
131    /// Lower this plan into one canonical planner-owned projection semantic spec.
132    #[must_use]
133    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
134        if let Some(static_shape) = &self.static_planning_shape {
135            return static_shape.projection_spec.clone();
136        }
137
138        lower_projection_intent(model, &self.logical, &self.projection_selection)
139    }
140
141    /// Lower this plan into one projection semantic shape for identity hashing.
142    #[must_use]
143    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
144        lower_projection_identity(&self.logical, &self.projection_selection)
145    }
146
147    /// Return the executor-facing predicate after removing only filtered-index
148    /// guard clauses the chosen access path already proves.
149    ///
150    /// This conservative form is used by preparation/explain surfaces that
151    /// still need to see access-bound equalities as index-predicate input.
152    #[must_use]
153    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
154        if let Some(static_shape) = self.static_planning_shape.as_ref() {
155            return static_shape.execution_preparation_predicate.clone();
156        }
157
158        derive_execution_preparation_predicate(self)
159    }
160
161    /// Return the executor-facing residual predicate after removing any
162    /// filtered-index guard clauses and fixed access-bound equalities already
163    /// guaranteed by the chosen path.
164    #[must_use]
165    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
166        if let Some(static_shape) = self.static_planning_shape.as_ref() {
167            return static_shape.residual_filter_predicate.clone();
168        }
169
170        derive_residual_filter_predicate(self)
171    }
172
173    /// Return whether one explicit residual predicate survives access
174    /// planning and still participates in residual execution.
175    #[must_use]
176    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
177        self.effective_execution_predicate().is_some()
178    }
179
180    /// Borrow the planner-owned residual scalar filter expression when one
181    /// surviving semantic remainder still requires runtime evaluation.
182    #[must_use]
183    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
184        if let Some(static_shape) = self.static_planning_shape.as_ref() {
185            return static_shape.residual_filter_expr.as_ref();
186        }
187
188        if !derive_has_residual_filter(self) {
189            return None;
190        }
191
192        self.scalar_plan().filter_expr.as_ref()
193    }
194
195    /// Return whether one explicit residual scalar filter expression survives
196    /// access planning and still requires runtime evaluation.
197    #[must_use]
198    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
199        self.residual_filter_expr().is_some()
200    }
201
202    /// Borrow the planner-compiled execution-preparation predicate program.
203    #[must_use]
204    pub(in crate::db) const fn execution_preparation_compiled_predicate(
205        &self,
206    ) -> Option<&PredicateProgram> {
207        self.static_planning_shape()
208            .execution_preparation_compiled_predicate
209            .as_ref()
210    }
211
212    /// Borrow the planner-compiled effective runtime predicate program.
213    #[must_use]
214    pub(in crate::db) const fn effective_runtime_compiled_predicate(
215        &self,
216    ) -> Option<&PredicateProgram> {
217        match self
218            .static_planning_shape()
219            .effective_runtime_filter_program
220            .as_ref()
221        {
222            Some(EffectiveRuntimeFilterProgram::Predicate(program)) => Some(program),
223            Some(EffectiveRuntimeFilterProgram::Expr(_)) | None => None,
224        }
225    }
226
227    /// Borrow the planner-compiled effective runtime scalar filter expression.
228    #[must_use]
229    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
230        &self,
231    ) -> Option<&ScalarProjectionExpr> {
232        match self
233            .static_planning_shape()
234            .effective_runtime_filter_program
235            .as_ref()
236        {
237            Some(EffectiveRuntimeFilterProgram::Expr(expr)) => Some(expr),
238            Some(EffectiveRuntimeFilterProgram::Predicate(_)) | None => None,
239        }
240    }
241
242    /// Borrow the planner-frozen effective runtime scalar filter program.
243    #[must_use]
244    pub(in crate::db) const fn effective_runtime_filter_program(
245        &self,
246    ) -> Option<&EffectiveRuntimeFilterProgram> {
247        self.static_planning_shape()
248            .effective_runtime_filter_program
249            .as_ref()
250    }
251
252    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
253    #[must_use]
254    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
255        if !self.scalar_plan().distinct {
256            return DistinctExecutionStrategy::None;
257        }
258
259        // DISTINCT on duplicate-safe single-path access shapes is a planner
260        // no-op for runtime dedup mechanics. Composite shapes can surface
261        // duplicate keys and therefore retain explicit dedup execution.
262        match distinct_runtime_dedup_strategy(&self.access) {
263            Some(strategy) => strategy,
264            None => DistinctExecutionStrategy::None,
265        }
266    }
267
268    /// Freeze one planner-owned route profile after model validation completes.
269    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
270        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
271    }
272
273    /// Freeze planner-owned executor metadata after logical/access planning completes.
274    pub(in crate::db) fn finalize_static_planning_shape_for_model(
275        &mut self,
276        model: &EntityModel,
277    ) -> Result<(), InternalError> {
278        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
279
280        Ok(())
281    }
282
283    /// Build one immutable execution-shape signature contract for runtime layers.
284    #[must_use]
285    pub(in crate::db) fn execution_shape_signature(
286        &self,
287        entity_path: &'static str,
288    ) -> ExecutionShapeSignature {
289        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
290    }
291
292    /// Return whether the chosen access contract fully satisfies the current
293    /// scalar query predicate without any additional post-access filtering.
294    #[must_use]
295    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
296        if let Some(static_shape) = self.static_planning_shape.as_ref() {
297            return self.scalar_plan().predicate.is_some()
298                && static_shape.residual_filter_predicate.is_none()
299                && static_shape.residual_filter_expr.is_none();
300        }
301
302        derive_predicate_fully_satisfied_by_access_contract(self)
303    }
304
305    /// Borrow the planner-frozen compiled scalar projection program.
306    #[must_use]
307    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
308        self.static_planning_shape()
309            .scalar_projection_plan
310            .as_deref()
311    }
312
313    /// Borrow the planner-frozen primary-key field name.
314    #[must_use]
315    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
316        self.static_planning_shape().primary_key_name
317    }
318
319    /// Borrow the planner-frozen projection slot reachability set.
320    #[must_use]
321    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
322        self.static_planning_shape()
323            .projection_referenced_slots
324            .as_slice()
325    }
326
327    /// Borrow the planner-frozen mask for direct projected output slots.
328    #[must_use]
329    #[cfg(any(test, feature = "diagnostics"))]
330    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
331        self.static_planning_shape().projected_slot_mask.as_slice()
332    }
333
334    /// Return whether projection remains the full model-identity field list.
335    #[must_use]
336    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
337        self.static_planning_shape().projection_is_model_identity
338    }
339
340    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
341    #[must_use]
342    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
343        self.static_planning_shape()
344            .order_referenced_slots
345            .as_deref()
346    }
347
348    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
349    #[must_use]
350    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
351        self.static_planning_shape().resolved_order.as_ref()
352    }
353
354    /// Borrow the planner-frozen access slot map used by index predicate compilation.
355    #[must_use]
356    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
357        self.static_planning_shape().slot_map.as_deref()
358    }
359
360    /// Borrow grouped aggregate execution specs already resolved during static planning.
361    #[must_use]
362    pub(in crate::db) fn grouped_aggregate_execution_specs(
363        &self,
364    ) -> Option<&[GroupedAggregateExecutionSpec]> {
365        self.static_planning_shape()
366            .grouped_aggregate_execution_specs
367            .as_deref()
368    }
369
370    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
371    #[must_use]
372    pub(in crate::db) const fn grouped_distinct_execution_strategy(
373        &self,
374    ) -> Option<&GroupedDistinctExecutionStrategy> {
375        self.static_planning_shape()
376            .grouped_distinct_execution_strategy
377            .as_ref()
378    }
379
380    /// Borrow the frozen projection semantic shape without reopening model ownership.
381    #[must_use]
382    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
383        &self.static_planning_shape().projection_spec
384    }
385
386    /// Borrow the frozen direct projection slots without reopening model ownership.
387    #[must_use]
388    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
389        self.static_planning_shape()
390            .projection_direct_slots
391            .as_deref()
392    }
393
394    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
395    #[must_use]
396    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
397        self.static_planning_shape()
398            .index_compile_targets
399            .as_deref()
400    }
401
402    const fn static_planning_shape(&self) -> &StaticPlanningShape {
403        self.static_planning_shape
404            .as_ref()
405            .expect("access-planned queries must freeze static planning shape before execution")
406    }
407}
408
409fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
410    match access {
411        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
412            Some(DistinctExecutionStrategy::PreOrdered)
413        }
414        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
415            Some(DistinctExecutionStrategy::HashMaterialize)
416        }
417        AccessPlan::Path(_) => None,
418    }
419}
420
421fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
422    let is_grouped_safe = plan
423        .grouped_plan()
424        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
425
426    ContinuationPolicy::new(
427        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
428        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
429        is_grouped_safe,
430    )
431}
432
433/// Project one planner-owned route profile from the finalized logical+access plan.
434#[must_use]
435pub(in crate::db) fn project_planner_route_profile_for_model(
436    model: &EntityModel,
437    plan: &AccessPlannedQuery,
438) -> PlannerRouteProfile {
439    let secondary_order_contract = plan
440        .scalar_plan()
441        .order
442        .as_ref()
443        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
444
445    PlannerRouteProfile::new(
446        derive_continuation_policy_validated(plan),
447        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
448        secondary_order_contract,
449    )
450}
451
452fn project_static_planning_shape_for_model(
453    model: &EntityModel,
454    plan: &AccessPlannedQuery,
455) -> Result<StaticPlanningShape, InternalError> {
456    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
457    let execution_preparation_predicate = plan.execution_preparation_predicate();
458    let residual_filter_predicate = derive_residual_filter_predicate(plan);
459    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
460    let execution_preparation_compiled_predicate =
461        compile_optional_predicate(model, execution_preparation_predicate.as_ref());
462    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
463        model,
464        residual_filter_expr.as_ref(),
465        residual_filter_predicate.as_ref(),
466    )?;
467    let scalar_projection_plan =
468        if plan.grouped_plan().is_none() {
469            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
470            InternalError::query_executor_invariant(
471                "scalar projection program must compile during static planning finalization",
472            )
473        })?)
474        } else {
475            None
476        };
477    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
478        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
479    let projection_direct_slots =
480        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
481    let projection_referenced_slots =
482        projection_referenced_slots_for_spec(model, &projection_spec)?;
483    let projected_slot_mask =
484        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
485    let projection_is_model_identity =
486        projection_is_model_identity_for_spec(model, &projection_spec);
487    let resolved_order = resolved_order_for_plan(model, plan)?;
488    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
489    let slot_map = slot_map_for_model_plan(model, plan);
490    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
491
492    Ok(StaticPlanningShape {
493        primary_key_name: model.primary_key.name,
494        projection_spec,
495        execution_preparation_predicate,
496        residual_filter_expr,
497        residual_filter_predicate,
498        execution_preparation_compiled_predicate,
499        effective_runtime_filter_program,
500        scalar_projection_plan,
501        grouped_aggregate_execution_specs,
502        grouped_distinct_execution_strategy,
503        projection_direct_slots,
504        projection_referenced_slots,
505        projected_slot_mask,
506        projection_is_model_identity,
507        resolved_order,
508        order_referenced_slots,
509        slot_map,
510        index_compile_targets,
511    })
512}
513
514// Compile the executor-owned residual scalar filter contract once from the
515// planner-derived residual artifacts so runtime never has to rediscover
516// residual presence or shape from semantic/filter/pushdown state.
517fn compile_effective_runtime_filter_program(
518    model: &EntityModel,
519    residual_filter_expr: Option<&Expr>,
520    residual_filter_predicate: Option<&Predicate>,
521) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
522    // Keep the existing predicate fast path when the residual semantics still
523    // fit the derived predicate contract. The expression-owned lane is only
524    // needed once pushdown loses semantic coverage and a residual predicate no
525    // longer exists.
526    if let Some(predicate) = residual_filter_predicate {
527        return Ok(Some(EffectiveRuntimeFilterProgram::Predicate(
528            PredicateProgram::compile(model, predicate),
529        )));
530    }
531
532    if let Some(filter_expr) = residual_filter_expr {
533        let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
534            InternalError::query_invalid_logical_plan(
535                "effective runtime scalar filter expression must compile during static planning finalization",
536            )
537        })?;
538
539        return Ok(Some(EffectiveRuntimeFilterProgram::Expr(compiled)));
540    }
541
542    Ok(None)
543}
544
545// Derive the executor-preparation predicate once from the selected access path.
546// This strips only filtered-index guard clauses while preserving access-bound
547// equalities that still matter to preparation/explain consumers.
548fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
549    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
550
551    match plan.access.selected_index_model() {
552        Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
553        None => Some(query_predicate.clone()),
554    }
555}
556
557// Derive the final residual predicate once from the already-filtered
558// preparation predicate plus any equality bounds guaranteed by the concrete
559// access path.
560fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
561    let filtered_residual = derive_execution_preparation_predicate(plan);
562    let filtered_residual = filtered_residual.as_ref()?;
563
564    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
565}
566
567// Derive the explicit residual semantic expression once for finalized plans.
568// The residual expression remains the planner-owned semantic filter when any
569// runtime filtering still survives access satisfaction.
570fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
571    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
572    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
573        return None;
574    }
575
576    Some(filter_expr.clone())
577}
578
579// Derive the explicit residual semantic expression during finalization using
580// the trusted entity schema so compare-family literal normalization matches the
581// planner-owned predicate contract before residual ownership is decided.
582fn derive_residual_filter_expr_for_model(
583    model: &EntityModel,
584    plan: &AccessPlannedQuery,
585) -> Option<Expr> {
586    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
587    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
588        return None;
589    }
590
591    Some(filter_expr.clone())
592}
593
594// Return whether any residual filtering survives after access planning. This
595// helper exists only for pre-finalization assembly; finalized plans must read
596// the explicit residual artifacts frozen in `StaticPlanningShape`.
597fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
598    match (
599        plan.scalar_plan().filter_expr.as_ref(),
600        plan.scalar_plan().predicate.as_ref(),
601    ) {
602        (None, None) => false,
603        (Some(_), None) => true,
604        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
605    }
606}
607
608// Return true when the planner-owned predicate contract is fully satisfied by
609// access planning and no semantic residual filter expression survives.
610fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
611    plan.scalar_plan().predicate.is_some()
612        && derive_residual_filter_predicate(plan).is_none()
613        && derive_residual_filter_expr(plan).is_none()
614}
615
616// Return true when the semantic filter expression is entirely represented by
617// the planner-owned predicate contract and the chosen access path satisfies
618// that predicate without any runtime remainder.
619fn derive_semantic_filter_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
620    let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
621        return false;
622    };
623    let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
624    let Some(filter_predicate) =
625        derive_normalized_bool_expr_predicate_subset(&normalized_filter_expr)
626    else {
627        return false;
628    };
629    let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
630        return false;
631    };
632
633    canonicalize_runtime_predicate_via_bool_expr(filter_predicate)
634        == canonicalize_runtime_predicate_via_bool_expr(query_predicate.clone())
635}
636
637// Return true when finalized planning can prove that the semantic filter
638// expression is completely represented by the planner-owned predicate contract
639// after aligning compare literals through the trusted entity schema.
640fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
641    model: &EntityModel,
642    plan: &AccessPlannedQuery,
643) -> bool {
644    let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
645        return false;
646    };
647    let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
648    let Some(filter_predicate) =
649        derive_normalized_bool_expr_predicate_subset(&normalized_filter_expr)
650    else {
651        return false;
652    };
653    let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
654        return false;
655    };
656    let schema = SchemaInfo::cached_for_entity_model(model);
657    let Ok(filter_predicate) = normalize_enum_literals(schema, &filter_predicate) else {
658        return false;
659    };
660    let Ok(query_predicate) = normalize_enum_literals(schema, query_predicate) else {
661        return false;
662    };
663
664    canonicalize_runtime_predicate_via_bool_expr(filter_predicate)
665        == canonicalize_runtime_predicate_via_bool_expr(query_predicate)
666}
667
668// Compile one optional planner-frozen predicate program while keeping the
669// static planning assembly path free of repeated `Option` mapping boilerplate.
670fn compile_optional_predicate(
671    model: &EntityModel,
672    predicate: Option<&Predicate>,
673) -> Option<PredicateProgram> {
674    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
675}
676
677// Resolve the grouped-only static planning semantics bundle once so grouped
678// aggregate execution specs and grouped DISTINCT strategy stay derived under
679// one shared grouped-plan branch.
680fn resolve_grouped_static_planning_semantics(
681    model: &EntityModel,
682    plan: &AccessPlannedQuery,
683    projection_spec: &ProjectionSpec,
684) -> Result<
685    (
686        Option<Vec<GroupedAggregateExecutionSpec>>,
687        Option<GroupedDistinctExecutionStrategy>,
688    ),
689    InternalError,
690> {
691    let Some(grouped) = plan.grouped_plan() else {
692        return Ok((None, None));
693    };
694
695    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
696        projection_spec,
697        grouped.group.group_fields.as_slice(),
698        grouped.group.aggregates.as_slice(),
699    )?;
700    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
701
702    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
703        model,
704        aggregate_specs.as_slice(),
705    )?);
706    let grouped_distinct_execution_strategy =
707        Some(resolved_grouped_distinct_execution_strategy_for_model(
708            model,
709            grouped.group.group_fields.as_slice(),
710            grouped.group.aggregates.as_slice(),
711            grouped.having_expr.as_ref(),
712        )?);
713
714    Ok((
715        grouped_aggregate_execution_specs,
716        grouped_distinct_execution_strategy,
717    ))
718}
719
720fn extend_grouped_having_aggregate_specs(
721    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
722    grouped: &GroupPlan,
723) -> Result<(), InternalError> {
724    if let Some(having_expr) = grouped.having_expr.as_ref() {
725        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
726    }
727
728    Ok(())
729}
730
731fn collect_grouped_having_expr_aggregate_specs(
732    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
733    expr: &Expr,
734) -> Result<(), InternalError> {
735    match expr {
736        Expr::Aggregate(aggregate_expr) => {
737            let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
738
739            if aggregate_specs
740                .iter()
741                .all(|current| current != &aggregate_spec)
742            {
743                aggregate_specs.push(aggregate_spec);
744            }
745        }
746        Expr::Field(_) | Expr::Literal(_) => {}
747        Expr::FunctionCall { args, .. } => {
748            for arg in args {
749                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arg)?;
750            }
751        }
752        Expr::Unary { expr, .. } => {
753            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
754        }
755        Expr::Case {
756            when_then_arms,
757            else_expr,
758        } => {
759            for arm in when_then_arms {
760                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.condition())?;
761                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.result())?;
762            }
763
764            collect_grouped_having_expr_aggregate_specs(aggregate_specs, else_expr)?;
765        }
766        Expr::Binary { left, right, .. } => {
767            collect_grouped_having_expr_aggregate_specs(aggregate_specs, left)?;
768            collect_grouped_having_expr_aggregate_specs(aggregate_specs, right)?;
769        }
770        #[cfg(test)]
771        Expr::Alias { expr, .. } => {
772            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
773        }
774    }
775
776    Ok(())
777}
778
779fn projection_referenced_slots_for_spec(
780    model: &EntityModel,
781    projection: &ProjectionSpec,
782) -> Result<Vec<usize>, InternalError> {
783    let mut referenced = vec![false; model.fields().len()];
784
785    for field in projection.fields() {
786        mark_projection_expr_slots(
787            model,
788            projection_field_expr(field),
789            referenced.as_mut_slice(),
790        )?;
791    }
792
793    Ok(referenced
794        .into_iter()
795        .enumerate()
796        .filter_map(|(slot, required)| required.then_some(slot))
797        .collect())
798}
799
800fn mark_projection_expr_slots(
801    model: &EntityModel,
802    expr: &Expr,
803    referenced: &mut [bool],
804) -> Result<(), InternalError> {
805    match expr {
806        Expr::Field(field_id) => {
807            let field_name = field_id.as_str();
808            let slot = resolve_required_field_slot(model, field_name, || {
809                InternalError::query_invalid_logical_plan(format!(
810                    "projection expression references unknown field '{field_name}'",
811                ))
812            })?;
813            referenced[slot] = true;
814        }
815        Expr::Literal(_) => {}
816        Expr::FunctionCall { args, .. } => {
817            for arg in args {
818                mark_projection_expr_slots(model, arg, referenced)?;
819            }
820        }
821        Expr::Case {
822            when_then_arms,
823            else_expr,
824        } => {
825            for arm in when_then_arms {
826                mark_projection_expr_slots(model, arm.condition(), referenced)?;
827                mark_projection_expr_slots(model, arm.result(), referenced)?;
828            }
829            mark_projection_expr_slots(model, else_expr.as_ref(), referenced)?;
830        }
831        Expr::Aggregate(_) => {}
832        #[cfg(test)]
833        Expr::Alias { expr, .. } => {
834            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
835        }
836        Expr::Unary { expr, .. } => {
837            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
838        }
839        Expr::Binary { left, right, .. } => {
840            mark_projection_expr_slots(model, left.as_ref(), referenced)?;
841            mark_projection_expr_slots(model, right.as_ref(), referenced)?;
842        }
843    }
844
845    Ok(())
846}
847
848fn projected_slot_mask_for_spec(
849    model: &EntityModel,
850    direct_projection_slots: Option<&[usize]>,
851) -> Vec<bool> {
852    let mut projected_slots = vec![false; model.fields().len()];
853
854    let Some(direct_projection_slots) = direct_projection_slots else {
855        return projected_slots;
856    };
857
858    for slot in direct_projection_slots.iter().copied() {
859        if let Some(projected) = projected_slots.get_mut(slot) {
860            *projected = true;
861        }
862    }
863
864    projected_slots
865}
866
867fn projection_is_model_identity_for_spec(model: &EntityModel, projection: &ProjectionSpec) -> bool {
868    if projection.len() != model.fields().len() {
869        return false;
870    }
871
872    for (field_model, projected_field) in model.fields().iter().zip(projection.fields()) {
873        match projected_field {
874            ProjectionField::Scalar {
875                expr: Expr::Field(field_id),
876                alias: None,
877            } if field_id.as_str() == field_model.name() => {}
878            ProjectionField::Scalar { .. } => return false,
879        }
880    }
881
882    true
883}
884
885fn resolved_order_for_plan(
886    model: &EntityModel,
887    plan: &AccessPlannedQuery,
888) -> Result<Option<ResolvedOrder>, InternalError> {
889    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
890        return Ok(None);
891    }
892
893    let Some(order) = plan.scalar_plan().order.as_ref() else {
894        return Ok(None);
895    };
896
897    let mut fields = Vec::with_capacity(order.fields.len());
898    for term in &order.fields {
899        fields.push(ResolvedOrderField::new(
900            resolved_order_value_source_for_term(model, term)?,
901            term.direction(),
902        ));
903    }
904
905    Ok(Some(ResolvedOrder::new(fields)))
906}
907
908fn resolved_order_value_source_for_term(
909    model: &EntityModel,
910    term: &crate::db::query::plan::OrderTerm,
911) -> Result<ResolvedOrderValueSource, InternalError> {
912    if term.direct_field().is_none() {
913        let rendered = term.rendered_label();
914        validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
915        let compiled = compile_scalar_projection_expr(model, term.expr())
916            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
917
918        return Ok(ResolvedOrderValueSource::expression(compiled));
919    }
920
921    let field = term
922        .direct_field()
923        .expect("direct-field order branch should only execute for field-backed terms");
924    let slot = resolve_required_field_slot(model, field, || {
925        InternalError::query_invalid_logical_plan(format!(
926            "order expression references unknown field '{field}'",
927        ))
928    })?;
929
930    Ok(ResolvedOrderValueSource::direct_field(slot))
931}
932
933fn validate_resolved_order_expr_fields(
934    model: &EntityModel,
935    expr: &Expr,
936    rendered: &str,
937) -> Result<(), InternalError> {
938    match expr {
939        Expr::Field(field_id) => {
940            resolve_required_field_slot(model, field_id.as_str(), || {
941                InternalError::query_invalid_logical_plan(format!(
942                    "order expression references unknown field '{rendered}'",
943                ))
944            })?;
945        }
946        Expr::Literal(_) => {}
947        Expr::FunctionCall { args, .. } => {
948            for arg in args {
949                validate_resolved_order_expr_fields(model, arg, rendered)?;
950            }
951        }
952        Expr::Case {
953            when_then_arms,
954            else_expr,
955        } => {
956            for arm in when_then_arms {
957                validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
958                validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
959            }
960            validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
961        }
962        Expr::Binary { left, right, .. } => {
963            validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
964            validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
965        }
966        Expr::Aggregate(_) => {
967            return Err(order_expression_scalar_seam_error(rendered));
968        }
969        #[cfg(test)]
970        Expr::Alias { .. } => {
971            return Err(order_expression_scalar_seam_error(rendered));
972        }
973        Expr::Unary { .. } => {
974            return Err(order_expression_scalar_seam_error(rendered));
975        }
976    }
977
978    Ok(())
979}
980
981// Resolve one model field slot while keeping planner invalid-logical-plan
982// error construction at the callsite that owns the diagnostic wording.
983fn resolve_required_field_slot<F>(
984    model: &EntityModel,
985    field: &str,
986    invalid_plan_error: F,
987) -> Result<usize, InternalError>
988where
989    F: FnOnce() -> InternalError,
990{
991    resolve_field_slot(model, field).ok_or_else(invalid_plan_error)
992}
993
994// Keep the scalar-order expression seam violation text under one helper so the
995// parse validation and compile validation paths do not drift.
996fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
997    InternalError::query_invalid_logical_plan(format!(
998        "order expression '{rendered}' did not stay on the scalar expression seam",
999    ))
1000}
1001
1002// Keep one stable executor-facing slot list for grouped order terms after the
1003// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
1004// now consumes this same referenced-slot contract instead of re-deriving order
1005// sources from planner strategy at runtime.
1006fn order_referenced_slots_for_resolved_order(
1007    resolved_order: Option<&ResolvedOrder>,
1008) -> Option<Vec<usize>> {
1009    let resolved_order = resolved_order?;
1010    let mut referenced = Vec::new();
1011
1012    // Keep one stable slot list without re-parsing order expressions after the
1013    // planner has already frozen structural ORDER BY sources.
1014    for field in resolved_order.fields() {
1015        field.source().extend_referenced_slots(&mut referenced);
1016    }
1017
1018    Some(referenced)
1019}
1020
1021fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
1022    let access_strategy = plan.access.resolve_strategy();
1023    let executable = access_strategy.executable();
1024
1025    resolved_index_slots_for_access_path(model, executable)
1026}
1027
1028fn resolved_index_slots_for_access_path(
1029    model: &EntityModel,
1030    access: &ExecutableAccessPlan<'_, crate::value::Value>,
1031) -> Option<Vec<usize>> {
1032    let path = access.as_path()?;
1033    let path_capabilities = path.capabilities();
1034    let index_fields = path_capabilities.index_fields_for_slot_map()?;
1035    let mut slots = Vec::with_capacity(index_fields.len());
1036
1037    for field_name in index_fields {
1038        let slot = resolve_field_slot(model, field_name)?;
1039        slots.push(slot);
1040    }
1041
1042    Some(slots)
1043}
1044
1045fn index_compile_targets_for_model_plan(
1046    model: &EntityModel,
1047    plan: &AccessPlannedQuery,
1048) -> Option<Vec<IndexCompileTarget>> {
1049    let index = plan.access.as_path()?.selected_index_model()?;
1050    let mut targets = Vec::new();
1051
1052    match index.key_items() {
1053        IndexKeyItemsRef::Fields(fields) => {
1054            for (component_index, &field_name) in fields.iter().enumerate() {
1055                let field_slot = resolve_field_slot(model, field_name)?;
1056                targets.push(IndexCompileTarget {
1057                    component_index,
1058                    field_slot,
1059                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
1060                });
1061            }
1062        }
1063        IndexKeyItemsRef::Items(items) => {
1064            for (component_index, &key_item) in items.iter().enumerate() {
1065                let field_slot = resolve_field_slot(model, key_item.field())?;
1066                targets.push(IndexCompileTarget {
1067                    component_index,
1068                    field_slot,
1069                    key_item,
1070                });
1071            }
1072        }
1073    }
1074
1075    Some(targets)
1076}