Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::{
10            IndexCompileTarget, Predicate, PredicateProgram, canonicalize_predicate_via_bool_expr,
11            derive_bool_expr_predicate_subset, normalize_bool_expr, normalize_enum_literals,
12        },
13        query::plan::{
14            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
15            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
16            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
17            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
18            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
19            derive_logical_pushdown_eligibility,
20            expr::{
21                Expr, ProjectionField, ProjectionSpec, ScalarProjectionExpr,
22                compile_scalar_projection_expr, compile_scalar_projection_plan,
23                projection_field_expr,
24            },
25            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
26            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
27            lower_projection_identity, lower_projection_intent,
28            residual_query_predicate_after_access_path_bounds,
29            residual_query_predicate_after_filtered_access,
30            resolved_grouped_distinct_execution_strategy_for_model,
31        },
32        schema::SchemaInfo,
33    },
34    error::InternalError,
35    model::{
36        entity::{EntityModel, resolve_field_slot},
37        index::IndexKeyItemsRef,
38    },
39};
40
41impl QueryMode {
42    /// True if this mode represents a load intent.
43    #[must_use]
44    pub const fn is_load(&self) -> bool {
45        match self {
46            Self::Load(_) => true,
47            Self::Delete(_) => false,
48        }
49    }
50
51    /// True if this mode represents a delete intent.
52    #[must_use]
53    pub const fn is_delete(&self) -> bool {
54        match self {
55            Self::Delete(_) => true,
56            Self::Load(_) => false,
57        }
58    }
59}
60
61impl LogicalPlan {
62    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
63    #[must_use]
64    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
65        match self {
66            Self::Scalar(plan) => plan,
67            Self::Grouped(plan) => &plan.scalar,
68        }
69    }
70
71    /// Borrow scalar semantic fields mutably across logical variants for tests.
72    #[must_use]
73    #[cfg(test)]
74    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
75        match self {
76            Self::Scalar(plan) => plan,
77            Self::Grouped(plan) => &mut plan.scalar,
78        }
79    }
80
81    /// Test-only shorthand for explicit scalar semantic borrowing.
82    #[must_use]
83    #[cfg(test)]
84    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
85        self.scalar_semantics()
86    }
87
88    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
89    #[must_use]
90    #[cfg(test)]
91    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
92        self.scalar_semantics_mut()
93    }
94}
95
96impl AccessPlannedQuery {
97    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
98    #[must_use]
99    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
100        self.logical.scalar_semantics()
101    }
102
103    /// Borrow scalar semantic fields mutably across logical variants for tests.
104    #[must_use]
105    #[cfg(test)]
106    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
107        self.logical.scalar_semantics_mut()
108    }
109
110    /// Test-only shorthand for explicit scalar plan borrowing.
111    #[must_use]
112    #[cfg(test)]
113    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
114        self.scalar_plan()
115    }
116
117    /// Test-only shorthand for explicit mutable scalar plan borrowing.
118    #[must_use]
119    #[cfg(test)]
120    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
121        self.scalar_plan_mut()
122    }
123
124    /// Borrow grouped semantic fields when this plan is grouped.
125    #[must_use]
126    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
127        match &self.logical {
128            LogicalPlan::Scalar(_) => None,
129            LogicalPlan::Grouped(plan) => Some(plan),
130        }
131    }
132
133    /// Lower this plan into one canonical planner-owned projection semantic spec.
134    #[must_use]
135    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
136        if let Some(static_shape) = &self.static_planning_shape {
137            return static_shape.projection_spec.clone();
138        }
139
140        lower_projection_intent(model, &self.logical, &self.projection_selection)
141    }
142
143    /// Lower this plan into one projection semantic shape for identity hashing.
144    #[must_use]
145    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
146        lower_projection_identity(&self.logical)
147    }
148
149    /// Return the executor-facing predicate after removing only filtered-index
150    /// guard clauses the chosen access path already proves.
151    ///
152    /// This conservative form is used by preparation/explain surfaces that
153    /// still need to see access-bound equalities as index-predicate input.
154    #[must_use]
155    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
156        if let Some(static_shape) = self.static_planning_shape.as_ref() {
157            return static_shape.execution_preparation_predicate.clone();
158        }
159
160        derive_execution_preparation_predicate(self)
161    }
162
163    /// Return the executor-facing residual predicate after removing any
164    /// filtered-index guard clauses and fixed access-bound equalities already
165    /// guaranteed by the chosen path.
166    #[must_use]
167    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
168        if let Some(static_shape) = self.static_planning_shape.as_ref() {
169            return static_shape.residual_filter_predicate.clone();
170        }
171
172        derive_residual_filter_predicate(self)
173    }
174
175    /// Return whether one explicit residual predicate survives access
176    /// planning and still participates in residual execution.
177    #[must_use]
178    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
179        self.effective_execution_predicate().is_some()
180    }
181
182    /// Borrow the planner-owned residual scalar filter expression when one
183    /// surviving semantic remainder still requires runtime evaluation.
184    #[must_use]
185    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
186        if let Some(static_shape) = self.static_planning_shape.as_ref() {
187            return static_shape.residual_filter_expr.as_ref();
188        }
189
190        if !derive_has_residual_filter(self) {
191            return None;
192        }
193
194        self.scalar_plan().filter_expr.as_ref()
195    }
196
197    /// Return whether one explicit residual scalar filter expression survives
198    /// access planning and still requires runtime evaluation.
199    #[must_use]
200    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
201        self.residual_filter_expr().is_some()
202    }
203
204    /// Borrow the planner-compiled execution-preparation predicate program.
205    #[must_use]
206    pub(in crate::db) const fn execution_preparation_compiled_predicate(
207        &self,
208    ) -> Option<&PredicateProgram> {
209        self.static_planning_shape()
210            .execution_preparation_compiled_predicate
211            .as_ref()
212    }
213
214    /// Borrow the planner-compiled effective runtime predicate program.
215    #[must_use]
216    pub(in crate::db) const fn effective_runtime_compiled_predicate(
217        &self,
218    ) -> Option<&PredicateProgram> {
219        match self
220            .static_planning_shape()
221            .effective_runtime_filter_program
222            .as_ref()
223        {
224            Some(EffectiveRuntimeFilterProgram::Predicate(program)) => Some(program),
225            Some(EffectiveRuntimeFilterProgram::Expr(_)) | None => None,
226        }
227    }
228
229    /// Borrow the planner-compiled effective runtime scalar filter expression.
230    #[must_use]
231    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
232        &self,
233    ) -> Option<&ScalarProjectionExpr> {
234        match self
235            .static_planning_shape()
236            .effective_runtime_filter_program
237            .as_ref()
238        {
239            Some(EffectiveRuntimeFilterProgram::Expr(expr)) => Some(expr),
240            Some(EffectiveRuntimeFilterProgram::Predicate(_)) | None => None,
241        }
242    }
243
244    /// Borrow the planner-frozen effective runtime scalar filter program.
245    #[must_use]
246    pub(in crate::db) const fn effective_runtime_filter_program(
247        &self,
248    ) -> Option<&EffectiveRuntimeFilterProgram> {
249        self.static_planning_shape()
250            .effective_runtime_filter_program
251            .as_ref()
252    }
253
254    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
255    #[must_use]
256    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
257        if !self.scalar_plan().distinct {
258            return DistinctExecutionStrategy::None;
259        }
260
261        // DISTINCT on duplicate-safe single-path access shapes is a planner
262        // no-op for runtime dedup mechanics. Composite shapes can surface
263        // duplicate keys and therefore retain explicit dedup execution.
264        match distinct_runtime_dedup_strategy(&self.access) {
265            Some(strategy) => strategy,
266            None => DistinctExecutionStrategy::None,
267        }
268    }
269
270    /// Freeze one planner-owned route profile after model validation completes.
271    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
272        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
273    }
274
275    /// Freeze planner-owned executor metadata after logical/access planning completes.
276    pub(in crate::db) fn finalize_static_planning_shape_for_model(
277        &mut self,
278        model: &EntityModel,
279    ) -> Result<(), InternalError> {
280        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
281
282        Ok(())
283    }
284
285    /// Build one immutable execution-shape signature contract for runtime layers.
286    #[must_use]
287    pub(in crate::db) fn execution_shape_signature(
288        &self,
289        entity_path: &'static str,
290    ) -> ExecutionShapeSignature {
291        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
292    }
293
294    /// Return whether the chosen access contract fully satisfies the current
295    /// scalar query predicate without any additional post-access filtering.
296    #[must_use]
297    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
298        if let Some(static_shape) = self.static_planning_shape.as_ref() {
299            return self.scalar_plan().predicate.is_some()
300                && static_shape.residual_filter_predicate.is_none()
301                && static_shape.residual_filter_expr.is_none();
302        }
303
304        derive_predicate_fully_satisfied_by_access_contract(self)
305    }
306
307    /// Borrow the planner-frozen compiled scalar projection program.
308    #[must_use]
309    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
310        self.static_planning_shape()
311            .scalar_projection_plan
312            .as_deref()
313    }
314
315    /// Borrow the planner-frozen primary-key field name.
316    #[must_use]
317    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
318        self.static_planning_shape().primary_key_name
319    }
320
321    /// Borrow the planner-frozen projection slot reachability set.
322    #[must_use]
323    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
324        self.static_planning_shape()
325            .projection_referenced_slots
326            .as_slice()
327    }
328
329    /// Borrow the planner-frozen mask for direct projected output slots.
330    #[must_use]
331    #[cfg(any(test, feature = "diagnostics"))]
332    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
333        self.static_planning_shape().projected_slot_mask.as_slice()
334    }
335
336    /// Return whether projection remains the full model-identity field list.
337    #[must_use]
338    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
339        self.static_planning_shape().projection_is_model_identity
340    }
341
342    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
343    #[must_use]
344    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
345        self.static_planning_shape()
346            .order_referenced_slots
347            .as_deref()
348    }
349
350    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
351    #[must_use]
352    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
353        self.static_planning_shape().resolved_order.as_ref()
354    }
355
356    /// Borrow the planner-frozen access slot map used by index predicate compilation.
357    #[must_use]
358    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
359        self.static_planning_shape().slot_map.as_deref()
360    }
361
362    /// Borrow grouped aggregate execution specs already resolved during static planning.
363    #[must_use]
364    pub(in crate::db) fn grouped_aggregate_execution_specs(
365        &self,
366    ) -> Option<&[GroupedAggregateExecutionSpec]> {
367        self.static_planning_shape()
368            .grouped_aggregate_execution_specs
369            .as_deref()
370    }
371
372    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
373    #[must_use]
374    pub(in crate::db) const fn grouped_distinct_execution_strategy(
375        &self,
376    ) -> Option<&GroupedDistinctExecutionStrategy> {
377        self.static_planning_shape()
378            .grouped_distinct_execution_strategy
379            .as_ref()
380    }
381
382    /// Borrow the frozen projection semantic shape without reopening model ownership.
383    #[must_use]
384    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
385        &self.static_planning_shape().projection_spec
386    }
387
388    /// Borrow the frozen direct projection slots without reopening model ownership.
389    #[must_use]
390    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
391        self.static_planning_shape()
392            .projection_direct_slots
393            .as_deref()
394    }
395
396    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
397    #[must_use]
398    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
399        self.static_planning_shape()
400            .index_compile_targets
401            .as_deref()
402    }
403
404    const fn static_planning_shape(&self) -> &StaticPlanningShape {
405        self.static_planning_shape
406            .as_ref()
407            .expect("access-planned queries must freeze static planning shape before execution")
408    }
409}
410
411fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
412    match access {
413        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
414            Some(DistinctExecutionStrategy::PreOrdered)
415        }
416        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
417            Some(DistinctExecutionStrategy::HashMaterialize)
418        }
419        AccessPlan::Path(_) => None,
420    }
421}
422
423fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
424    let is_grouped_safe = plan
425        .grouped_plan()
426        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
427
428    ContinuationPolicy::new(
429        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
430        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
431        is_grouped_safe,
432    )
433}
434
435/// Project one planner-owned route profile from the finalized logical+access plan.
436#[must_use]
437pub(in crate::db) fn project_planner_route_profile_for_model(
438    model: &EntityModel,
439    plan: &AccessPlannedQuery,
440) -> PlannerRouteProfile {
441    let secondary_order_contract = plan
442        .scalar_plan()
443        .order
444        .as_ref()
445        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
446
447    PlannerRouteProfile::new(
448        derive_continuation_policy_validated(plan),
449        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
450        secondary_order_contract,
451    )
452}
453
454fn project_static_planning_shape_for_model(
455    model: &EntityModel,
456    plan: &AccessPlannedQuery,
457) -> Result<StaticPlanningShape, InternalError> {
458    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
459    let execution_preparation_predicate = plan.execution_preparation_predicate();
460    let residual_filter_predicate = derive_residual_filter_predicate(plan);
461    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
462    let execution_preparation_compiled_predicate =
463        compile_optional_predicate(model, execution_preparation_predicate.as_ref());
464    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
465        model,
466        residual_filter_expr.as_ref(),
467        residual_filter_predicate.as_ref(),
468    )?;
469    let scalar_projection_plan =
470        if plan.grouped_plan().is_none() {
471            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
472            InternalError::query_executor_invariant(
473                "scalar projection program must compile during static planning finalization",
474            )
475        })?)
476        } else {
477            None
478        };
479    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
480        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
481    let projection_direct_slots =
482        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
483    let projection_referenced_slots =
484        projection_referenced_slots_for_spec(model, &projection_spec)?;
485    let projected_slot_mask =
486        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
487    let projection_is_model_identity =
488        projection_is_model_identity_for_spec(model, &projection_spec);
489    let resolved_order = resolved_order_for_plan(model, plan)?;
490    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
491    let slot_map = slot_map_for_model_plan(model, plan);
492    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
493
494    Ok(StaticPlanningShape {
495        primary_key_name: model.primary_key.name,
496        projection_spec,
497        execution_preparation_predicate,
498        residual_filter_expr,
499        residual_filter_predicate,
500        execution_preparation_compiled_predicate,
501        effective_runtime_filter_program,
502        scalar_projection_plan,
503        grouped_aggregate_execution_specs,
504        grouped_distinct_execution_strategy,
505        projection_direct_slots,
506        projection_referenced_slots,
507        projected_slot_mask,
508        projection_is_model_identity,
509        resolved_order,
510        order_referenced_slots,
511        slot_map,
512        index_compile_targets,
513    })
514}
515
516// Compile the executor-owned residual scalar filter contract once from the
517// planner-derived residual artifacts so runtime never has to rediscover
518// residual presence or shape from semantic/filter/pushdown state.
519fn compile_effective_runtime_filter_program(
520    model: &EntityModel,
521    residual_filter_expr: Option<&Expr>,
522    residual_filter_predicate: Option<&Predicate>,
523) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
524    // Keep the existing predicate fast path when the residual semantics still
525    // fit the derived predicate contract. The expression-owned lane is only
526    // needed once pushdown loses semantic coverage and a residual predicate no
527    // longer exists.
528    if let Some(predicate) = residual_filter_predicate {
529        return Ok(Some(EffectiveRuntimeFilterProgram::Predicate(
530            PredicateProgram::compile(model, predicate),
531        )));
532    }
533
534    if let Some(filter_expr) = residual_filter_expr {
535        let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
536            InternalError::query_invalid_logical_plan(
537                "effective runtime scalar filter expression must compile during static planning finalization",
538            )
539        })?;
540
541        return Ok(Some(EffectiveRuntimeFilterProgram::Expr(compiled)));
542    }
543
544    Ok(None)
545}
546
547// Derive the executor-preparation predicate once from the selected access path.
548// This strips only filtered-index guard clauses while preserving access-bound
549// equalities that still matter to preparation/explain consumers.
550fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
551    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
552
553    match plan.access.selected_index_model() {
554        Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
555        None => Some(query_predicate.clone()),
556    }
557}
558
559// Derive the final residual predicate once from the already-filtered
560// preparation predicate plus any equality bounds guaranteed by the concrete
561// access path.
562fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
563    let filtered_residual = derive_execution_preparation_predicate(plan);
564    let filtered_residual = filtered_residual.as_ref()?;
565
566    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
567}
568
569// Derive the explicit residual semantic expression once for finalized plans.
570// The residual expression remains the planner-owned semantic filter when any
571// runtime filtering still survives access satisfaction.
572fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
573    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
574    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
575        return None;
576    }
577
578    Some(filter_expr.clone())
579}
580
581// Derive the explicit residual semantic expression during finalization using
582// the trusted entity schema so compare-family literal normalization matches the
583// planner-owned predicate contract before residual ownership is decided.
584fn derive_residual_filter_expr_for_model(
585    model: &EntityModel,
586    plan: &AccessPlannedQuery,
587) -> Option<Expr> {
588    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
589    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
590        return None;
591    }
592
593    Some(filter_expr.clone())
594}
595
596// Return whether any residual filtering survives after access planning. This
597// helper exists only for pre-finalization assembly; finalized plans must read
598// the explicit residual artifacts frozen in `StaticPlanningShape`.
599fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
600    match (
601        plan.scalar_plan().filter_expr.as_ref(),
602        plan.scalar_plan().predicate.as_ref(),
603    ) {
604        (None, None) => false,
605        (Some(_), None) => true,
606        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
607    }
608}
609
610// Return true when the planner-owned predicate contract is fully satisfied by
611// access planning and no semantic residual filter expression survives.
612fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
613    plan.scalar_plan().predicate.is_some()
614        && derive_residual_filter_predicate(plan).is_none()
615        && derive_residual_filter_expr(plan).is_none()
616}
617
618// Return true when the semantic filter expression is entirely represented by
619// the planner-owned predicate contract and the chosen access path satisfies
620// that predicate without any runtime remainder.
621fn derive_semantic_filter_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
622    let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
623        return false;
624    };
625    let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
626    let Some(filter_predicate) = derive_bool_expr_predicate_subset(&normalized_filter_expr) else {
627        return false;
628    };
629    let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
630        return false;
631    };
632
633    canonicalize_predicate_via_bool_expr(filter_predicate)
634        == canonicalize_predicate_via_bool_expr(query_predicate.clone())
635}
636
637// Return true when finalized planning can prove that the semantic filter
638// expression is completely represented by the planner-owned predicate contract
639// after aligning compare literals through the trusted entity schema.
640fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
641    model: &EntityModel,
642    plan: &AccessPlannedQuery,
643) -> bool {
644    let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
645        return false;
646    };
647    let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
648    let Some(filter_predicate) = derive_bool_expr_predicate_subset(&normalized_filter_expr) else {
649        return false;
650    };
651    let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
652        return false;
653    };
654    let schema = SchemaInfo::cached_for_entity_model(model);
655    let Ok(filter_predicate) = normalize_enum_literals(schema, &filter_predicate) else {
656        return false;
657    };
658    let Ok(query_predicate) = normalize_enum_literals(schema, query_predicate) else {
659        return false;
660    };
661
662    canonicalize_predicate_via_bool_expr(filter_predicate)
663        == canonicalize_predicate_via_bool_expr(query_predicate)
664}
665
666// Compile one optional planner-frozen predicate program while keeping the
667// static planning assembly path free of repeated `Option` mapping boilerplate.
668fn compile_optional_predicate(
669    model: &EntityModel,
670    predicate: Option<&Predicate>,
671) -> Option<PredicateProgram> {
672    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
673}
674
675// Resolve the grouped-only static planning semantics bundle once so grouped
676// aggregate execution specs and grouped DISTINCT strategy stay derived under
677// one shared grouped-plan branch.
678fn resolve_grouped_static_planning_semantics(
679    model: &EntityModel,
680    plan: &AccessPlannedQuery,
681    projection_spec: &ProjectionSpec,
682) -> Result<
683    (
684        Option<Vec<GroupedAggregateExecutionSpec>>,
685        Option<GroupedDistinctExecutionStrategy>,
686    ),
687    InternalError,
688> {
689    let Some(grouped) = plan.grouped_plan() else {
690        return Ok((None, None));
691    };
692
693    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
694        projection_spec,
695        grouped.group.group_fields.as_slice(),
696        grouped.group.aggregates.as_slice(),
697    )?;
698    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
699
700    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
701        model,
702        aggregate_specs.as_slice(),
703    )?);
704    let grouped_distinct_execution_strategy =
705        Some(resolved_grouped_distinct_execution_strategy_for_model(
706            model,
707            grouped.group.group_fields.as_slice(),
708            grouped.group.aggregates.as_slice(),
709            grouped.having_expr.as_ref(),
710        )?);
711
712    Ok((
713        grouped_aggregate_execution_specs,
714        grouped_distinct_execution_strategy,
715    ))
716}
717
718fn extend_grouped_having_aggregate_specs(
719    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
720    grouped: &GroupPlan,
721) -> Result<(), InternalError> {
722    if let Some(having_expr) = grouped.having_expr.as_ref() {
723        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
724    }
725
726    Ok(())
727}
728
729fn collect_grouped_having_expr_aggregate_specs(
730    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
731    expr: &Expr,
732) -> Result<(), InternalError> {
733    match expr {
734        Expr::Aggregate(aggregate_expr) => {
735            let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
736
737            if aggregate_specs
738                .iter()
739                .all(|current| current != &aggregate_spec)
740            {
741                aggregate_specs.push(aggregate_spec);
742            }
743        }
744        Expr::Field(_) | Expr::Literal(_) => {}
745        Expr::FunctionCall { args, .. } => {
746            for arg in args {
747                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arg)?;
748            }
749        }
750        Expr::Unary { expr, .. } => {
751            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
752        }
753        Expr::Case {
754            when_then_arms,
755            else_expr,
756        } => {
757            for arm in when_then_arms {
758                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.condition())?;
759                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.result())?;
760            }
761
762            collect_grouped_having_expr_aggregate_specs(aggregate_specs, else_expr)?;
763        }
764        Expr::Binary { left, right, .. } => {
765            collect_grouped_having_expr_aggregate_specs(aggregate_specs, left)?;
766            collect_grouped_having_expr_aggregate_specs(aggregate_specs, right)?;
767        }
768        #[cfg(test)]
769        Expr::Alias { expr, .. } => {
770            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
771        }
772    }
773
774    Ok(())
775}
776
777fn projection_referenced_slots_for_spec(
778    model: &EntityModel,
779    projection: &ProjectionSpec,
780) -> Result<Vec<usize>, InternalError> {
781    let mut referenced = vec![false; model.fields().len()];
782
783    for field in projection.fields() {
784        mark_projection_expr_slots(
785            model,
786            projection_field_expr(field),
787            referenced.as_mut_slice(),
788        )?;
789    }
790
791    Ok(referenced
792        .into_iter()
793        .enumerate()
794        .filter_map(|(slot, required)| required.then_some(slot))
795        .collect())
796}
797
798fn mark_projection_expr_slots(
799    model: &EntityModel,
800    expr: &Expr,
801    referenced: &mut [bool],
802) -> Result<(), InternalError> {
803    match expr {
804        Expr::Field(field_id) => {
805            let field_name = field_id.as_str();
806            let slot = resolve_required_field_slot(model, field_name, || {
807                InternalError::query_invalid_logical_plan(format!(
808                    "projection expression references unknown field '{field_name}'",
809                ))
810            })?;
811            referenced[slot] = true;
812        }
813        Expr::Literal(_) => {}
814        Expr::FunctionCall { args, .. } => {
815            for arg in args {
816                mark_projection_expr_slots(model, arg, referenced)?;
817            }
818        }
819        Expr::Case {
820            when_then_arms,
821            else_expr,
822        } => {
823            for arm in when_then_arms {
824                mark_projection_expr_slots(model, arm.condition(), referenced)?;
825                mark_projection_expr_slots(model, arm.result(), referenced)?;
826            }
827            mark_projection_expr_slots(model, else_expr.as_ref(), referenced)?;
828        }
829        Expr::Aggregate(_) => {}
830        #[cfg(test)]
831        Expr::Alias { expr, .. } => {
832            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
833        }
834        Expr::Unary { expr, .. } => {
835            mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
836        }
837        Expr::Binary { left, right, .. } => {
838            mark_projection_expr_slots(model, left.as_ref(), referenced)?;
839            mark_projection_expr_slots(model, right.as_ref(), referenced)?;
840        }
841    }
842
843    Ok(())
844}
845
846fn projected_slot_mask_for_spec(
847    model: &EntityModel,
848    direct_projection_slots: Option<&[usize]>,
849) -> Vec<bool> {
850    let mut projected_slots = vec![false; model.fields().len()];
851
852    let Some(direct_projection_slots) = direct_projection_slots else {
853        return projected_slots;
854    };
855
856    for slot in direct_projection_slots.iter().copied() {
857        if let Some(projected) = projected_slots.get_mut(slot) {
858            *projected = true;
859        }
860    }
861
862    projected_slots
863}
864
865fn projection_is_model_identity_for_spec(model: &EntityModel, projection: &ProjectionSpec) -> bool {
866    if projection.len() != model.fields().len() {
867        return false;
868    }
869
870    for (field_model, projected_field) in model.fields().iter().zip(projection.fields()) {
871        match projected_field {
872            ProjectionField::Scalar {
873                expr: Expr::Field(field_id),
874                alias: None,
875            } if field_id.as_str() == field_model.name() => {}
876            ProjectionField::Scalar { .. } => return false,
877        }
878    }
879
880    true
881}
882
883fn resolved_order_for_plan(
884    model: &EntityModel,
885    plan: &AccessPlannedQuery,
886) -> Result<Option<ResolvedOrder>, InternalError> {
887    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
888        return Ok(None);
889    }
890
891    let Some(order) = plan.scalar_plan().order.as_ref() else {
892        return Ok(None);
893    };
894
895    let mut fields = Vec::with_capacity(order.fields.len());
896    for term in &order.fields {
897        fields.push(ResolvedOrderField::new(
898            resolved_order_value_source_for_term(model, term)?,
899            term.direction(),
900        ));
901    }
902
903    Ok(Some(ResolvedOrder::new(fields)))
904}
905
906fn resolved_order_value_source_for_term(
907    model: &EntityModel,
908    term: &crate::db::query::plan::OrderTerm,
909) -> Result<ResolvedOrderValueSource, InternalError> {
910    if term.direct_field().is_none() {
911        let rendered = term.rendered_label();
912        validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
913        let compiled = compile_scalar_projection_expr(model, term.expr())
914            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
915
916        return Ok(ResolvedOrderValueSource::expression(compiled));
917    }
918
919    let field = term
920        .direct_field()
921        .expect("direct-field order branch should only execute for field-backed terms");
922    let slot = resolve_required_field_slot(model, field, || {
923        InternalError::query_invalid_logical_plan(format!(
924            "order expression references unknown field '{field}'",
925        ))
926    })?;
927
928    Ok(ResolvedOrderValueSource::direct_field(slot))
929}
930
931fn validate_resolved_order_expr_fields(
932    model: &EntityModel,
933    expr: &Expr,
934    rendered: &str,
935) -> Result<(), InternalError> {
936    match expr {
937        Expr::Field(field_id) => {
938            resolve_required_field_slot(model, field_id.as_str(), || {
939                InternalError::query_invalid_logical_plan(format!(
940                    "order expression references unknown field '{rendered}'",
941                ))
942            })?;
943        }
944        Expr::Literal(_) => {}
945        Expr::FunctionCall { args, .. } => {
946            for arg in args {
947                validate_resolved_order_expr_fields(model, arg, rendered)?;
948            }
949        }
950        Expr::Case {
951            when_then_arms,
952            else_expr,
953        } => {
954            for arm in when_then_arms {
955                validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
956                validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
957            }
958            validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
959        }
960        Expr::Binary { left, right, .. } => {
961            validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
962            validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
963        }
964        Expr::Aggregate(_) => {
965            return Err(order_expression_scalar_seam_error(rendered));
966        }
967        #[cfg(test)]
968        Expr::Alias { .. } => {
969            return Err(order_expression_scalar_seam_error(rendered));
970        }
971        Expr::Unary { .. } => {
972            return Err(order_expression_scalar_seam_error(rendered));
973        }
974    }
975
976    Ok(())
977}
978
979// Resolve one model field slot while keeping planner invalid-logical-plan
980// error construction at the callsite that owns the diagnostic wording.
981fn resolve_required_field_slot<F>(
982    model: &EntityModel,
983    field: &str,
984    invalid_plan_error: F,
985) -> Result<usize, InternalError>
986where
987    F: FnOnce() -> InternalError,
988{
989    resolve_field_slot(model, field).ok_or_else(invalid_plan_error)
990}
991
992// Keep the scalar-order expression seam violation text under one helper so the
993// parse validation and compile validation paths do not drift.
994fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
995    InternalError::query_invalid_logical_plan(format!(
996        "order expression '{rendered}' did not stay on the scalar expression seam",
997    ))
998}
999
1000// Keep one stable executor-facing slot list for grouped order terms after the
1001// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
1002// now consumes this same referenced-slot contract instead of re-deriving order
1003// sources from planner strategy at runtime.
1004fn order_referenced_slots_for_resolved_order(
1005    resolved_order: Option<&ResolvedOrder>,
1006) -> Option<Vec<usize>> {
1007    let resolved_order = resolved_order?;
1008    let mut referenced = Vec::new();
1009
1010    // Keep one stable slot list without re-parsing order expressions after the
1011    // planner has already frozen structural ORDER BY sources.
1012    for field in resolved_order.fields() {
1013        field.source().extend_referenced_slots(&mut referenced);
1014    }
1015
1016    Some(referenced)
1017}
1018
1019fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
1020    let access_strategy = plan.access.resolve_strategy();
1021    let executable = access_strategy.executable();
1022
1023    resolved_index_slots_for_access_path(model, executable)
1024}
1025
1026fn resolved_index_slots_for_access_path(
1027    model: &EntityModel,
1028    access: &ExecutableAccessPlan<'_, crate::value::Value>,
1029) -> Option<Vec<usize>> {
1030    let path = access.as_path()?;
1031    let path_capabilities = path.capabilities();
1032    let index_fields = path_capabilities.index_fields_for_slot_map()?;
1033    let mut slots = Vec::with_capacity(index_fields.len());
1034
1035    for field_name in index_fields {
1036        let slot = resolve_field_slot(model, field_name)?;
1037        slots.push(slot);
1038    }
1039
1040    Some(slots)
1041}
1042
1043fn index_compile_targets_for_model_plan(
1044    model: &EntityModel,
1045    plan: &AccessPlannedQuery,
1046) -> Option<Vec<IndexCompileTarget>> {
1047    let index = plan.access.as_path()?.selected_index_model()?;
1048    let mut targets = Vec::new();
1049
1050    match index.key_items() {
1051        IndexKeyItemsRef::Fields(fields) => {
1052            for (component_index, &field_name) in fields.iter().enumerate() {
1053                let field_slot = resolve_field_slot(model, field_name)?;
1054                targets.push(IndexCompileTarget {
1055                    component_index,
1056                    field_slot,
1057                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
1058                });
1059            }
1060        }
1061        IndexKeyItemsRef::Items(items) => {
1062            for (component_index, &key_item) in items.iter().enumerate() {
1063                let field_slot = resolve_field_slot(model, key_item.field())?;
1064                targets.push(IndexCompileTarget {
1065                    component_index,
1066                    field_slot,
1067                    key_item,
1068                });
1069            }
1070        }
1071    }
1072
1073    Some(targets)
1074}