Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9    db::{
10        access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11        predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12        query::plan::{
13            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16            LogicalPlan, PlannerRouteProfile, PredicatePushdownDiagnostics, QueryMode,
17            ResidualFilterContract, ResidualFilterShape, ResolvedOrder, ResolvedOrderField,
18            ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
19            derive_logical_pushdown_eligibility,
20            expr::{
21                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
22                compile_scalar_projection_plan_with_schema,
23            },
24            extend_unique_grouped_aggregate_specs_from_expr, grouped_aggregate_execution_specs,
25            grouped_aggregate_specs_from_projection_spec, grouped_cursor_policy_violation,
26            grouped_plan_strategy, lower_data_row_direct_projection_slots_with_schema,
27            lower_direct_projection_slots_with_schema, lower_projection_identity,
28            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
29            residual_query_predicate_after_filtered_access_contract,
30            resolved_grouped_distinct_execution_strategy_with_schema_info,
31        },
32        schema::SchemaInfo,
33    },
34    error::InternalError,
35    model::{
36        entity::EntityModel,
37        index::{IndexKeyItem, IndexKeyItemsRef},
38    },
39};
40
41impl QueryMode {
42    /// True if this mode represents a load intent.
43    #[must_use]
44    pub const fn is_load(&self) -> bool {
45        match self {
46            Self::Load(_) => true,
47            Self::Delete(_) => false,
48        }
49    }
50
51    /// True if this mode represents a delete intent.
52    #[must_use]
53    pub const fn is_delete(&self) -> bool {
54        match self {
55            Self::Delete(_) => true,
56            Self::Load(_) => false,
57        }
58    }
59}
60
61impl LogicalPlan {
62    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
63    #[must_use]
64    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
65        match self {
66            Self::Scalar(plan) => plan,
67            Self::Grouped(plan) => &plan.scalar,
68        }
69    }
70
71    /// Borrow scalar semantic fields mutably across logical variants for tests.
72    #[must_use]
73    #[cfg(test)]
74    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
75        match self {
76            Self::Scalar(plan) => plan,
77            Self::Grouped(plan) => &mut plan.scalar,
78        }
79    }
80
81    /// Test-only shorthand for explicit scalar semantic borrowing.
82    #[must_use]
83    #[cfg(test)]
84    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
85        self.scalar_semantics()
86    }
87
88    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
89    #[must_use]
90    #[cfg(test)]
91    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
92        self.scalar_semantics_mut()
93    }
94}
95
96impl AccessPlannedQuery {
97    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
98    #[must_use]
99    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
100        self.logical.scalar_semantics()
101    }
102
103    /// Borrow scalar missing-row consistency without exposing the full scalar
104    /// plan to executor owners that only need row-presence policy.
105    #[must_use]
106    #[cfg(any(test, feature = "sql"))]
107    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
108        self.scalar_plan().consistency
109    }
110
111    /// Borrow scalar semantic fields mutably across logical variants for tests.
112    #[must_use]
113    #[cfg(test)]
114    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
115        self.logical.scalar_semantics_mut()
116    }
117
118    /// Test-only shorthand for explicit scalar plan borrowing.
119    #[must_use]
120    #[cfg(test)]
121    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
122        self.scalar_plan()
123    }
124
125    /// Test-only shorthand for explicit mutable scalar plan borrowing.
126    #[must_use]
127    #[cfg(test)]
128    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
129        self.scalar_plan_mut()
130    }
131
132    /// Borrow grouped semantic fields when this plan is grouped.
133    #[must_use]
134    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
135        match &self.logical {
136            LogicalPlan::Scalar(_) => None,
137            LogicalPlan::Grouped(plan) => Some(plan),
138        }
139    }
140
141    /// Lower this plan into one canonical planner-owned projection semantic spec.
142    #[must_use]
143    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
144        if let Some(static_contract) = &self.static_execution_planning_contract {
145            return static_contract.projection_spec.clone();
146        }
147
148        lower_projection_intent(model, &self.logical, &self.projection_selection)
149    }
150
151    /// Lower this plan into one projection semantic shape for identity hashing.
152    #[must_use]
153    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
154        lower_projection_identity(&self.logical, &self.projection_selection)
155    }
156
157    /// Return the executor-facing predicate after removing only filtered-index
158    /// guard clauses the chosen access path already proves.
159    ///
160    /// This conservative form is used by preparation/explain surfaces that
161    /// still need to see access-bound equalities as index-predicate input.
162    #[must_use]
163    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
164        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
165            return static_contract.execution_preparation_predicate.clone();
166        }
167
168        derive_execution_preparation_predicate(self)
169    }
170
171    /// Return the executor-facing residual predicate after removing any
172    /// filtered-index guard clauses and fixed access-bound equalities already
173    /// guaranteed by the chosen path.
174    #[must_use]
175    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
176        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
177            return static_contract
178                .residual_filter_contract
179                .residual_filter_predicate()
180                .cloned();
181        }
182
183        derive_residual_filter_predicate(self)
184    }
185
186    /// Return whether one explicit residual predicate survives access
187    /// planning and still participates in residual execution.
188    #[must_use]
189    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
190        self.effective_execution_predicate().is_some()
191    }
192
193    /// Borrow the planner-owned residual scalar filter expression when one
194    /// surviving semantic remainder still requires runtime evaluation.
195    #[must_use]
196    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
197        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
198            return static_contract
199                .residual_filter_contract
200                .residual_filter_expr();
201        }
202
203        if !derive_has_residual_filter(self) {
204            return None;
205        }
206
207        self.scalar_plan().filter_expr.as_ref()
208    }
209
210    /// Return whether one explicit residual scalar filter expression survives
211    /// access planning and still requires runtime evaluation.
212    #[must_use]
213    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
214        self.residual_filter_expr().is_some()
215    }
216
217    /// Return the planner-owned residual-filter shape used by diagnostics.
218    #[must_use]
219    pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
220        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
221            return static_contract.residual_filter_contract.shape();
222        }
223
224        ResidualFilterShape::from_presence(
225            self.residual_filter_expr().is_some(),
226            self.effective_execution_predicate().is_some(),
227        )
228    }
229
230    /// Return the planner-owned predicate pushdown label consumed by verbose
231    /// execution diagnostics.
232    #[must_use]
233    pub(in crate::db) fn predicate_pushdown_label(&self) -> String {
234        self.predicate_pushdown_diagnostics().label()
235    }
236
237    /// Return planner-owned predicate-pushdown diagnostics.
238    #[must_use]
239    pub(in crate::db) fn predicate_pushdown_diagnostics(&self) -> PredicatePushdownDiagnostics {
240        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
241            return static_contract.predicate_pushdown_diagnostics;
242        }
243
244        derive_predicate_pushdown_diagnostics(self, self.residual_filter_shape())
245    }
246
247    /// Return the planner-owned predicate-pushdown outcome label.
248    #[must_use]
249    pub(in crate::db) fn predicate_pushdown_outcome_label(&self) -> &'static str {
250        self.predicate_pushdown_diagnostics().outcome_label()
251    }
252
253    /// Return the planner-owned predicate-pushdown reason label.
254    #[must_use]
255    pub(in crate::db) fn predicate_pushdown_reason_label(&self) -> &'static str {
256        self.predicate_pushdown_diagnostics().reason_label()
257    }
258
259    /// Borrow the planner-compiled execution-preparation predicate program.
260    #[must_use]
261    pub(in crate::db) fn execution_preparation_compiled_predicate(
262        &self,
263    ) -> Option<&PredicateProgram> {
264        self.static_execution_planning_contract()?
265            .execution_preparation_compiled_predicate
266            .as_ref()
267    }
268
269    /// Borrow the planner-compiled effective runtime predicate program.
270    #[must_use]
271    pub(in crate::db) fn effective_runtime_compiled_predicate(&self) -> Option<&PredicateProgram> {
272        match self
273            .static_execution_planning_contract()?
274            .residual_filter_contract
275            .effective_runtime_filter_program()
276        {
277            Some(program) => program.predicate_program(),
278            None => None,
279        }
280    }
281
282    /// Borrow the planner-compiled effective runtime scalar filter expression.
283    #[cfg(test)]
284    #[must_use]
285    pub(in crate::db) fn effective_runtime_compiled_filter_expr(&self) -> Option<&CompiledExpr> {
286        match self
287            .static_execution_planning_contract()?
288            .residual_filter_contract
289            .effective_runtime_filter_program()
290        {
291            Some(program) => program.expression_filter(),
292            None => None,
293        }
294    }
295
296    /// Borrow the planner-frozen effective runtime scalar filter program.
297    #[must_use]
298    pub(in crate::db) fn effective_runtime_filter_program(
299        &self,
300    ) -> Option<&EffectiveRuntimeFilterProgram> {
301        self.static_execution_planning_contract()?
302            .residual_filter_contract
303            .effective_runtime_filter_program()
304    }
305
306    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
307    #[must_use]
308    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
309        if !self.scalar_plan().distinct {
310            return DistinctExecutionStrategy::None;
311        }
312
313        // DISTINCT on duplicate-safe single-path access shapes is a planner
314        // no-op for runtime dedup mechanics. Composite shapes can surface
315        // duplicate keys and therefore retain explicit dedup execution.
316        match distinct_runtime_dedup_strategy(&self.access) {
317            Some(strategy) => strategy,
318            None => DistinctExecutionStrategy::None,
319        }
320    }
321
322    /// Freeze one planner-owned route profile after model validation completes.
323    #[cfg(test)]
324    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
325        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
326    }
327
328    /// Freeze one planner-owned route profile from accepted schema authority.
329    pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
330        &mut self,
331        schema_info: &SchemaInfo,
332    ) {
333        self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
334    }
335
336    /// Freeze model-only executor metadata after logical/access planning completes.
337    #[cfg(test)]
338    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
339        &mut self,
340        model: &EntityModel,
341    ) -> Result<(), InternalError> {
342        self.finalize_static_execution_planning_contract_for_model_with_schema(
343            model,
344            SchemaInfo::cached_for_generated_entity_model(model),
345        )
346    }
347
348    /// Freeze planner-owned executor metadata with explicit schema authority.
349    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
350        &mut self,
351        model: &EntityModel,
352        schema_info: &SchemaInfo,
353    ) -> Result<(), InternalError> {
354        self.static_execution_planning_contract = Some(
355            project_static_execution_planning_contract_for_model(model, schema_info, self)?,
356        );
357
358        Ok(())
359    }
360
361    /// Build one immutable execution-shape signature contract for runtime layers.
362    #[must_use]
363    pub(in crate::db) fn execution_shape_signature(
364        &self,
365        entity_path: &'static str,
366    ) -> ExecutionShapeSignature {
367        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
368    }
369
370    /// Return whether the chosen access contract fully satisfies the current
371    /// scalar query predicate without any additional post-access filtering.
372    #[must_use]
373    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
374        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
375            return self.scalar_plan().predicate.is_some()
376                && !static_contract
377                    .residual_filter_contract
378                    .has_residual_filter();
379        }
380
381        derive_predicate_fully_satisfied_by_access_contract(self)
382    }
383
384    /// Borrow the planner-frozen compiled scalar projection program.
385    #[must_use]
386    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
387        self.static_execution_planning_contract()?
388            .scalar_projection_plan
389            .as_deref()
390    }
391
392    /// Return whether planner-owned static execution metadata has already been frozen.
393    #[must_use]
394    pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
395        self.static_execution_planning_contract.is_some()
396    }
397
398    /// Borrow the planner-frozen ordered primary-key field names.
399    pub(in crate::db) fn primary_key_names(&self) -> Result<Vec<&str>, InternalError> {
400        Ok(self
401            .require_static_execution_planning_contract()?
402            .primary_key_names
403            .iter()
404            .map(String::as_str)
405            .collect())
406    }
407
408    /// Borrow the planner-frozen projection slot reachability set.
409    pub(in crate::db) fn projection_referenced_slots(&self) -> Result<&[usize], InternalError> {
410        Ok(self
411            .require_static_execution_planning_contract()?
412            .projection_referenced_slots
413            .as_slice())
414    }
415
416    /// Borrow the planner-frozen mask for direct projected output slots.
417    #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
418    pub(in crate::db) fn projected_slot_mask(&self) -> Result<&[bool], InternalError> {
419        Ok(self
420            .require_static_execution_planning_contract()?
421            .projected_slot_mask
422            .as_slice())
423    }
424
425    /// Return whether projection remains the full model-identity field list.
426    pub(in crate::db) fn projection_is_model_identity(&self) -> Result<bool, InternalError> {
427        Ok(self
428            .require_static_execution_planning_contract()?
429            .projection_is_model_identity)
430    }
431
432    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
433    #[must_use]
434    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
435        self.static_execution_planning_contract()?
436            .order_referenced_slots
437            .as_deref()
438    }
439
440    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
441    #[must_use]
442    pub(in crate::db) fn resolved_order(&self) -> Option<&ResolvedOrder> {
443        self.static_execution_planning_contract()?
444            .resolved_order
445            .as_ref()
446    }
447
448    /// Borrow the planner-frozen access slot map used by index predicate compilation.
449    #[must_use]
450    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
451        self.static_execution_planning_contract()?
452            .slot_map
453            .as_deref()
454    }
455
456    /// Borrow grouped aggregate execution specs already resolved during static planning.
457    #[must_use]
458    pub(in crate::db) fn grouped_aggregate_execution_specs(
459        &self,
460    ) -> Option<&[GroupedAggregateExecutionSpec]> {
461        self.static_execution_planning_contract()?
462            .grouped_aggregate_execution_specs
463            .as_deref()
464    }
465
466    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
467    #[must_use]
468    pub(in crate::db) fn grouped_distinct_execution_strategy(
469        &self,
470    ) -> Option<&GroupedDistinctExecutionStrategy> {
471        self.static_execution_planning_contract()?
472            .grouped_distinct_execution_strategy
473            .as_ref()
474    }
475
476    /// Borrow the frozen projection semantic shape without reopening model ownership.
477    pub(in crate::db) fn frozen_projection_spec(&self) -> Result<&ProjectionSpec, InternalError> {
478        Ok(&self
479            .require_static_execution_planning_contract()?
480            .projection_spec)
481    }
482
483    /// Borrow the frozen direct projection slots without reopening model ownership.
484    #[must_use]
485    #[cfg(any(test, feature = "sql"))]
486    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
487        self.static_execution_planning_contract()?
488            .projection_direct_slots
489            .as_deref()
490    }
491
492    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
493    #[must_use]
494    #[cfg(any(test, feature = "sql"))]
495    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
496        self.static_execution_planning_contract()?
497            .projection_data_row_direct_slots
498            .as_deref()
499    }
500
501    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
502    #[must_use]
503    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
504        self.static_execution_planning_contract()?
505            .index_compile_targets
506            .as_deref()
507    }
508
509    const fn static_execution_planning_contract(&self) -> Option<&StaticExecutionPlanningContract> {
510        self.static_execution_planning_contract.as_ref()
511    }
512
513    fn require_static_execution_planning_contract(
514        &self,
515    ) -> Result<&StaticExecutionPlanningContract, InternalError> {
516        self.static_execution_planning_contract
517            .as_ref()
518            .ok_or_else(InternalError::query_executor_invariant)
519    }
520}
521
522fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
523    match access {
524        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
525            Some(DistinctExecutionStrategy::PreOrdered)
526        }
527        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
528            Some(DistinctExecutionStrategy::HashMaterialize)
529        }
530        AccessPlan::Path(_) => None,
531    }
532}
533
534fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
535    let is_grouped_safe = plan
536        .grouped_plan()
537        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
538
539    ContinuationPolicy::new(
540        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
541        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
542        is_grouped_safe,
543    )
544}
545
546/// Project one planner-owned route profile from the finalized logical+access plan.
547#[must_use]
548#[cfg(test)]
549pub(in crate::db) fn project_planner_route_profile_for_model(
550    model: &EntityModel,
551    plan: &AccessPlannedQuery,
552) -> PlannerRouteProfile {
553    let primary_key_names = ordered_primary_key_names(model);
554    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
555        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
556    });
557
558    PlannerRouteProfile::new(
559        derive_continuation_policy_validated(plan),
560        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
561        secondary_order_contract,
562    )
563}
564
565/// Project one planner-owned route profile from accepted schema authority.
566#[must_use]
567pub(in crate::db) fn project_planner_route_profile_for_schema(
568    schema_info: &SchemaInfo,
569    plan: &AccessPlannedQuery,
570) -> PlannerRouteProfile {
571    let primary_key_names = primary_key_names_from_schema(schema_info);
572    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
573        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
574    });
575
576    PlannerRouteProfile::new(
577        derive_continuation_policy_validated(plan),
578        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
579        secondary_order_contract,
580    )
581}
582
583fn project_static_execution_planning_contract_for_model(
584    model: &EntityModel,
585    schema_info: &SchemaInfo,
586    plan: &AccessPlannedQuery,
587) -> Result<StaticExecutionPlanningContract, InternalError> {
588    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
589    let execution_preparation_predicate = plan.execution_preparation_predicate();
590    let residual_filter_predicate = derive_residual_filter_predicate(plan);
591    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
592    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
593        schema_info,
594        residual_filter_expr.as_ref(),
595        residual_filter_predicate.as_ref(),
596    )?;
597    let residual_filter_contract = ResidualFilterContract::new(
598        residual_filter_expr,
599        residual_filter_predicate,
600        effective_runtime_filter_program,
601    );
602    let residual_filter_shape = residual_filter_contract.shape();
603    let execution_preparation_compiled_predicate =
604        should_compile_execution_preparation_predicate(residual_filter_shape)
605            .then(|| {
606                compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref())
607            })
608            .flatten();
609    let predicate_pushdown_diagnostics =
610        derive_predicate_pushdown_diagnostics(plan, residual_filter_shape);
611    let scalar_projection_plan = if plan.grouped_plan().is_none() {
612        Some(
613            compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
614                .ok_or_else(InternalError::query_executor_invariant)?
615                .iter()
616                .map(CompiledExpr::compile)
617                .collect(),
618        )
619    } else {
620        None
621    };
622    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
623        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
624    let projection_direct_slots = lower_direct_projection_slots_with_schema(
625        model,
626        schema_info,
627        &plan.logical,
628        &plan.projection_selection,
629    );
630    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
631        model,
632        schema_info,
633        &plan.logical,
634        &plan.projection_selection,
635    );
636    let projection_referenced_slots =
637        projection_spec.referenced_slots_for_schema(model, schema_info)?;
638    let projected_slot_mask =
639        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
640    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
641    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
642    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
643    let slot_map = slot_map_for_schema_plan(schema_info, plan);
644    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
645
646    Ok(StaticExecutionPlanningContract {
647        primary_key_names: schema_info.primary_key_names().to_vec(),
648        projection_spec,
649        execution_preparation_predicate,
650        execution_preparation_compiled_predicate,
651        residual_filter_contract,
652        predicate_pushdown_diagnostics,
653        scalar_projection_plan,
654        grouped_aggregate_execution_specs,
655        grouped_distinct_execution_strategy,
656        projection_direct_slots,
657        projection_data_row_direct_slots,
658        projection_referenced_slots,
659        projected_slot_mask,
660        projection_is_model_identity,
661        resolved_order,
662        order_referenced_slots,
663        slot_map,
664        index_compile_targets,
665    })
666}
667
668#[cfg(test)]
669fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
670    model.primary_key_names()
671}
672
673fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
674    schema_info
675        .primary_key_names()
676        .iter()
677        .map(String::as_str)
678        .collect()
679}
680
681// Compile the executor-owned residual scalar filter contract once from the
682// planner-derived residual artifacts so runtime never has to rediscover
683// residual presence or shape from semantic/filter/pushdown state.
684fn compile_effective_runtime_filter_program(
685    schema_info: &SchemaInfo,
686    residual_filter_expr: Option<&Expr>,
687    residual_filter_predicate: Option<&Predicate>,
688) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
689    // Keep the existing predicate fast path when the residual semantics still
690    // fit the derived predicate contract. The expression-owned lane is only
691    // needed once pushdown loses semantic coverage and a residual predicate no
692    // longer exists.
693    if let Some(predicate) = residual_filter_predicate {
694        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
695            PredicateProgram::compile_with_schema_info(schema_info, predicate),
696        )));
697    }
698
699    if let Some(filter_expr) = residual_filter_expr {
700        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
701            .ok_or_else(InternalError::query_invalid_logical_plan)?;
702
703        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
704            CompiledExpr::compile(&compiled),
705        )));
706    }
707
708    Ok(None)
709}
710
711// Derive the executor-preparation predicate once from the selected access path.
712// This strips only filtered-index guard clauses while preserving access-bound
713// equalities that still matter to preparation/explain consumers.
714fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
715    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
716
717    match plan.access.selected_index_contract() {
718        Some(index) => {
719            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
720        }
721        None => Some(query_predicate.clone()),
722    }
723}
724
725// Derive the final residual predicate once from the already-filtered
726// preparation predicate plus any equality bounds guaranteed by the concrete
727// access path.
728fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
729    let filtered_residual = derive_execution_preparation_predicate(plan);
730    let filtered_residual = filtered_residual.as_ref()?;
731
732    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
733}
734
735// Derive the explicit residual semantic expression once for finalized plans.
736// The residual expression remains the planner-owned semantic filter when any
737// runtime filtering still survives access satisfaction.
738fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
739    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
740    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
741        return None;
742    }
743
744    Some(filter_expr.clone())
745}
746
747// Derive the explicit residual semantic expression during finalization using
748// the trusted entity schema so compare-family literal normalization matches the
749// planner-owned predicate contract before residual ownership is decided.
750fn derive_residual_filter_expr_for_model(
751    model: &EntityModel,
752    plan: &AccessPlannedQuery,
753) -> Option<Expr> {
754    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
755    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
756        return None;
757    }
758
759    Some(filter_expr.clone())
760}
761
762// Return whether any residual filtering survives after access planning. This
763// helper exists only for pre-finalization assembly; finalized plans must read
764// the explicit residual artifacts frozen in `StaticExecutionPlanningContract`.
765fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
766    match (
767        plan.scalar_plan().filter_expr.as_ref(),
768        plan.scalar_plan().predicate.as_ref(),
769    ) {
770        (None, None) => false,
771        (Some(_), None) => true,
772        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
773    }
774}
775
776// Freeze predicate-pushdown diagnostics from one logical plan shape. This keeps
777// lazy plan accessors and finalized static planning on the same argument
778// contract while leaving route selection and residual filtering unchanged.
779fn derive_predicate_pushdown_diagnostics(
780    plan: &AccessPlannedQuery,
781    residual_filter_shape: ResidualFilterShape,
782) -> PredicatePushdownDiagnostics {
783    PredicatePushdownDiagnostics::from_plan(
784        plan.scalar_plan().filter_expr.is_some(),
785        plan.scalar_plan().predicate_covers_filter_expr,
786        plan.scalar_plan().predicate.as_ref(),
787        &plan.access,
788        residual_filter_shape,
789    )
790}
791
792// Return true when the planner-owned predicate contract is fully satisfied by
793// access planning and no semantic residual filter expression survives.
794fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
795    plan.scalar_plan().predicate.is_some()
796        && derive_residual_filter_predicate(plan).is_none()
797        && derive_residual_filter_expr(plan).is_none()
798}
799
800// Return true when the semantic filter expression is entirely represented by
801// the planner-owned predicate contract and the chosen access path satisfies
802// that predicate without any runtime remainder.
803const fn derive_semantic_filter_fully_satisfied_by_access_contract(
804    plan: &AccessPlannedQuery,
805) -> bool {
806    plan.scalar_plan().filter_expr.is_some()
807        && plan.scalar_plan().predicate.is_some()
808        && plan.scalar_plan().predicate_covers_filter_expr
809}
810
811// Return true when finalized planning can prove that the semantic filter
812// expression is completely represented by the planner-owned predicate contract
813// after aligning compare literals through the trusted entity schema.
814const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
815    _model: &EntityModel,
816    plan: &AccessPlannedQuery,
817) -> bool {
818    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
819}
820
821// Compile one optional planner-frozen predicate program while keeping the
822// static planning assembly path free of repeated `Option` mapping boilerplate.
823fn compile_optional_predicate(
824    schema_info: &SchemaInfo,
825    predicate: Option<&Predicate>,
826) -> Option<PredicateProgram> {
827    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
828}
829
830// Avoid compiling large access-proven predicates into executor preparation.
831// When no residual filter survives, the chosen access route already enforces
832// the predicate and route/explain consumers can use the explicit residual
833// contract instead of recompiling access-bound literals.
834const fn should_compile_execution_preparation_predicate(
835    residual_filter_shape: ResidualFilterShape,
836) -> bool {
837    !residual_filter_shape.is_absent()
838}
839
840// Resolve the grouped-only static planning semantics bundle once so grouped
841// aggregate execution specs and grouped DISTINCT strategy stay derived under
842// one shared grouped-plan branch.
843fn resolve_grouped_static_planning_semantics(
844    schema_info: &SchemaInfo,
845    plan: &AccessPlannedQuery,
846    projection_spec: &ProjectionSpec,
847) -> Result<
848    (
849        Option<Vec<GroupedAggregateExecutionSpec>>,
850        Option<GroupedDistinctExecutionStrategy>,
851    ),
852    InternalError,
853> {
854    let Some(grouped) = plan.grouped_plan() else {
855        return Ok((None, None));
856    };
857
858    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
859        projection_spec,
860        grouped.group.group_fields.as_slice(),
861        grouped.group.aggregates.as_slice(),
862    )?;
863    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
864
865    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
866        schema_info,
867        aggregate_specs.as_slice(),
868    )?);
869    let grouped_distinct_execution_strategy = Some(
870        resolved_grouped_distinct_execution_strategy_with_schema_info(
871            schema_info,
872            grouped.group.group_fields.as_slice(),
873            grouped.group.aggregates.as_slice(),
874            grouped.having_expr.as_ref(),
875        )?,
876    );
877
878    Ok((
879        grouped_aggregate_execution_specs,
880        grouped_distinct_execution_strategy,
881    ))
882}
883
884fn extend_grouped_having_aggregate_specs(
885    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
886    grouped: &GroupPlan,
887) -> Result<(), InternalError> {
888    if let Some(having_expr) = grouped.having_expr.as_ref() {
889        extend_unique_grouped_aggregate_specs_from_expr(aggregate_specs, having_expr)?;
890    }
891
892    Ok(())
893}
894
895fn projected_slot_mask_for_spec(
896    model: &EntityModel,
897    direct_projection_slots: Option<&[usize]>,
898) -> Vec<bool> {
899    let schema_slot_len = direct_projection_slots
900        .and_then(|slots| slots.iter().copied().max())
901        .map_or(0, |slot| slot.saturating_add(1));
902    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
903
904    let Some(direct_projection_slots) = direct_projection_slots else {
905        return projected_slots;
906    };
907
908    for slot in direct_projection_slots.iter().copied() {
909        if let Some(projected) = projected_slots.get_mut(slot) {
910            *projected = true;
911        }
912    }
913
914    projected_slots
915}
916
917fn resolved_order_for_plan(
918    schema_info: &SchemaInfo,
919    plan: &AccessPlannedQuery,
920) -> Result<Option<ResolvedOrder>, InternalError> {
921    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
922        return Ok(None);
923    }
924
925    let Some(order) = plan.scalar_plan().order.as_ref() else {
926        return Ok(None);
927    };
928
929    let mut fields = Vec::with_capacity(order.fields.len());
930    for term in &order.fields {
931        fields.push(ResolvedOrderField::new(
932            resolved_order_value_source_for_term(schema_info, term)?,
933            term.direction(),
934        ));
935    }
936
937    Ok(Some(ResolvedOrder::new(fields)))
938}
939
940fn resolved_order_value_source_for_term(
941    schema_info: &SchemaInfo,
942    term: &crate::db::query::plan::OrderTerm,
943) -> Result<ResolvedOrderValueSource, InternalError> {
944    if term.direct_field().is_none() {
945        let rendered = term.rendered_label();
946        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
947        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
948            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
949
950        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
951            &compiled,
952        )));
953    }
954
955    let Some(field) = term.direct_field() else {
956        return Err(InternalError::query_invalid_logical_plan());
957    };
958    let slot = resolve_required_schema_slot(
959        schema_info,
960        field,
961        InternalError::query_invalid_logical_plan,
962    )?;
963
964    Ok(ResolvedOrderValueSource::direct_field(slot))
965}
966
967fn validate_resolved_order_expr_fields(
968    schema_info: &SchemaInfo,
969    expr: &Expr,
970    rendered: &str,
971) -> Result<(), InternalError> {
972    expr.try_for_each_tree_expr(&mut |node| match node {
973        Expr::Field(field_id) => resolve_required_schema_slot(
974            schema_info,
975            field_id.as_str(),
976            InternalError::query_invalid_logical_plan,
977        )
978        .map(|_| ()),
979        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
980        #[cfg(test)]
981        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
982        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
983        _ => Ok(()),
984    })
985}
986
987// Resolve one schema-authoritative field slot while keeping planner
988// invalid-logical-plan error construction at the callsite that owns the
989// diagnostic wording.
990fn resolve_required_schema_slot<F>(
991    schema_info: &SchemaInfo,
992    field: &str,
993    invalid_plan_error: F,
994) -> Result<usize, InternalError>
995where
996    F: FnOnce() -> InternalError,
997{
998    schema_info
999        .field_slot_index(field)
1000        .ok_or_else(invalid_plan_error)
1001}
1002
1003// Keep the scalar-order expression seam violation text under one helper so the
1004// parse validation and compile validation paths do not drift.
1005fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
1006    InternalError::query_invalid_logical_plan()
1007}
1008
1009// Keep one stable executor-facing slot list for grouped order terms after the
1010// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
1011// now consumes this same referenced-slot contract instead of re-deriving order
1012// sources from planner strategy at runtime.
1013fn order_referenced_slots_for_resolved_order(
1014    resolved_order: Option<&ResolvedOrder>,
1015) -> Option<Vec<usize>> {
1016    Some(resolved_order?.referenced_slots())
1017}
1018
1019fn slot_map_for_schema_plan(
1020    schema_info: &SchemaInfo,
1021    plan: &AccessPlannedQuery,
1022) -> Option<Vec<usize>> {
1023    let executable = plan.access.executable_contract();
1024
1025    resolved_index_slots_for_access_path(schema_info, &executable)
1026}
1027
1028fn resolved_index_slots_for_access_path(
1029    schema_info: &SchemaInfo,
1030    access: &ExecutableAccessPlan<'_, crate::value::Value>,
1031) -> Option<Vec<usize>> {
1032    let path = access.as_path()?;
1033    let path_facts = path.shape_facts();
1034    let key_items = path_facts.index_key_items_for_slot_map()?;
1035    let mut slots = Vec::new();
1036
1037    match key_items.key_items() {
1038        SemanticIndexKeyItemsRef::Fields(fields) => {
1039            slots.reserve(fields.len());
1040            for field_name in fields {
1041                let slot = schema_info.field_slot_index(field_name)?;
1042                slots.push(slot);
1043            }
1044        }
1045        SemanticIndexKeyItemsRef::Accepted(items) => {
1046            slots.reserve(items.len());
1047            for key_item in items {
1048                let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1049                slots.push(slot);
1050            }
1051        }
1052        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1053            slots.reserve(fields.len());
1054            for &field_name in fields {
1055                let slot = schema_info.field_slot_index(field_name)?;
1056                slots.push(slot);
1057            }
1058        }
1059        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1060            slots.reserve(items.len());
1061            for key_item in items {
1062                let slot = schema_info.field_slot_index(key_item.field())?;
1063                slots.push(slot);
1064            }
1065        }
1066    }
1067
1068    Some(slots)
1069}
1070
1071fn index_compile_targets_for_schema_plan(
1072    schema_info: &SchemaInfo,
1073    plan: &AccessPlannedQuery,
1074) -> Option<Vec<IndexCompileTarget>> {
1075    let executable = plan.access.executable_contract();
1076    let path = executable.as_path()?;
1077    let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1078    let mut targets = Vec::new();
1079
1080    match key_items.key_items() {
1081        SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1082            return None;
1083        }
1084        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1085            for (component_index, &field_name) in fields.iter().enumerate() {
1086                let field_slot = schema_info.field_slot_index(field_name)?;
1087                targets.push(IndexCompileTarget {
1088                    component_index,
1089                    field_slot,
1090                    key_item: IndexKeyItem::Field(field_name),
1091                });
1092            }
1093        }
1094        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1095            for (component_index, &key_item) in items.iter().enumerate() {
1096                let field_slot = schema_info.field_slot_index(key_item.field())?;
1097                targets.push(IndexCompileTarget {
1098                    component_index,
1099                    field_slot,
1100                    key_item,
1101                });
1102            }
1103        }
1104    }
1105
1106    Some(targets)
1107}