Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9    db::{
10        access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11        predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12        query::plan::{
13            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16            LogicalPlan, PlannerRouteProfile, PredicatePushdownDiagnostics, QueryMode,
17            ResidualFilterContract, ResidualFilterShape, ResolvedOrder, ResolvedOrderField,
18            ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
19            derive_logical_pushdown_eligibility,
20            expr::{
21                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
22                compile_scalar_projection_plan_with_schema,
23            },
24            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
25            grouped_cursor_policy_violation, grouped_plan_strategy,
26            lower_data_row_direct_projection_slots_with_schema,
27            lower_direct_projection_slots_with_schema, lower_projection_identity,
28            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
29            residual_query_predicate_after_filtered_access_contract,
30            resolved_grouped_distinct_execution_strategy_with_schema_info,
31        },
32        schema::SchemaInfo,
33    },
34    error::InternalError,
35    model::{
36        entity::EntityModel,
37        index::{IndexKeyItem, IndexKeyItemsRef},
38    },
39};
40
41impl QueryMode {
42    /// True if this mode represents a load intent.
43    #[must_use]
44    pub const fn is_load(&self) -> bool {
45        match self {
46            Self::Load(_) => true,
47            Self::Delete(_) => false,
48        }
49    }
50
51    /// True if this mode represents a delete intent.
52    #[must_use]
53    pub const fn is_delete(&self) -> bool {
54        match self {
55            Self::Delete(_) => true,
56            Self::Load(_) => false,
57        }
58    }
59}
60
61impl LogicalPlan {
62    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
63    #[must_use]
64    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
65        match self {
66            Self::Scalar(plan) => plan,
67            Self::Grouped(plan) => &plan.scalar,
68        }
69    }
70
71    /// Borrow scalar semantic fields mutably across logical variants for tests.
72    #[must_use]
73    #[cfg(test)]
74    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
75        match self {
76            Self::Scalar(plan) => plan,
77            Self::Grouped(plan) => &mut plan.scalar,
78        }
79    }
80
81    /// Test-only shorthand for explicit scalar semantic borrowing.
82    #[must_use]
83    #[cfg(test)]
84    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
85        self.scalar_semantics()
86    }
87
88    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
89    #[must_use]
90    #[cfg(test)]
91    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
92        self.scalar_semantics_mut()
93    }
94}
95
96impl AccessPlannedQuery {
97    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
98    #[must_use]
99    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
100        self.logical.scalar_semantics()
101    }
102
103    /// Borrow scalar missing-row consistency without exposing the full scalar
104    /// plan to executor owners that only need row-presence policy.
105    #[must_use]
106    #[cfg(any(test, feature = "sql"))]
107    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
108        self.scalar_plan().consistency
109    }
110
111    /// Borrow scalar semantic fields mutably across logical variants for tests.
112    #[must_use]
113    #[cfg(test)]
114    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
115        self.logical.scalar_semantics_mut()
116    }
117
118    /// Test-only shorthand for explicit scalar plan borrowing.
119    #[must_use]
120    #[cfg(test)]
121    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
122        self.scalar_plan()
123    }
124
125    /// Test-only shorthand for explicit mutable scalar plan borrowing.
126    #[must_use]
127    #[cfg(test)]
128    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
129        self.scalar_plan_mut()
130    }
131
132    /// Borrow grouped semantic fields when this plan is grouped.
133    #[must_use]
134    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
135        match &self.logical {
136            LogicalPlan::Scalar(_) => None,
137            LogicalPlan::Grouped(plan) => Some(plan),
138        }
139    }
140
141    /// Lower this plan into one canonical planner-owned projection semantic spec.
142    #[must_use]
143    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
144        if let Some(static_contract) = &self.static_execution_planning_contract {
145            return static_contract.projection_spec.clone();
146        }
147
148        lower_projection_intent(model, &self.logical, &self.projection_selection)
149    }
150
151    /// Lower this plan into one projection semantic shape for identity hashing.
152    #[must_use]
153    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
154        lower_projection_identity(&self.logical, &self.projection_selection)
155    }
156
157    /// Return the executor-facing predicate after removing only filtered-index
158    /// guard clauses the chosen access path already proves.
159    ///
160    /// This conservative form is used by preparation/explain surfaces that
161    /// still need to see access-bound equalities as index-predicate input.
162    #[must_use]
163    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
164        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
165            return static_contract.execution_preparation_predicate.clone();
166        }
167
168        derive_execution_preparation_predicate(self)
169    }
170
171    /// Return the executor-facing residual predicate after removing any
172    /// filtered-index guard clauses and fixed access-bound equalities already
173    /// guaranteed by the chosen path.
174    #[must_use]
175    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
176        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
177            return static_contract
178                .residual_filter_contract
179                .residual_filter_predicate()
180                .cloned();
181        }
182
183        derive_residual_filter_predicate(self)
184    }
185
186    /// Return whether one explicit residual predicate survives access
187    /// planning and still participates in residual execution.
188    #[must_use]
189    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
190        self.effective_execution_predicate().is_some()
191    }
192
193    /// Borrow the planner-owned residual scalar filter expression when one
194    /// surviving semantic remainder still requires runtime evaluation.
195    #[must_use]
196    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
197        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
198            return static_contract
199                .residual_filter_contract
200                .residual_filter_expr();
201        }
202
203        if !derive_has_residual_filter(self) {
204            return None;
205        }
206
207        self.scalar_plan().filter_expr.as_ref()
208    }
209
210    /// Return whether one explicit residual scalar filter expression survives
211    /// access planning and still requires runtime evaluation.
212    #[must_use]
213    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
214        self.residual_filter_expr().is_some()
215    }
216
217    /// Return the planner-owned residual-filter shape used by diagnostics.
218    #[must_use]
219    pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
220        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
221            return static_contract.residual_filter_contract.shape();
222        }
223
224        ResidualFilterShape::from_presence(
225            self.residual_filter_expr().is_some(),
226            self.effective_execution_predicate().is_some(),
227        )
228    }
229
230    /// Return the planner-owned predicate pushdown label consumed by verbose
231    /// execution diagnostics.
232    #[must_use]
233    pub(in crate::db) fn predicate_pushdown_label(&self) -> String {
234        self.predicate_pushdown_diagnostics().label()
235    }
236
237    /// Return planner-owned predicate-pushdown diagnostics.
238    #[must_use]
239    pub(in crate::db) fn predicate_pushdown_diagnostics(&self) -> PredicatePushdownDiagnostics {
240        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
241            return static_contract.predicate_pushdown_diagnostics;
242        }
243
244        PredicatePushdownDiagnostics::from_plan(
245            self.scalar_plan().filter_expr.is_some(),
246            self.scalar_plan().predicate_covers_filter_expr,
247            self.scalar_plan().predicate.as_ref(),
248            &self.access,
249            self.residual_filter_shape(),
250        )
251    }
252
253    /// Return the planner-owned predicate-pushdown outcome label.
254    #[must_use]
255    pub(in crate::db) fn predicate_pushdown_outcome_label(&self) -> &'static str {
256        self.predicate_pushdown_diagnostics().outcome_label()
257    }
258
259    /// Return the planner-owned predicate-pushdown reason label.
260    #[must_use]
261    pub(in crate::db) fn predicate_pushdown_reason_label(&self) -> &'static str {
262        self.predicate_pushdown_diagnostics().reason_label()
263    }
264
265    /// Borrow the planner-compiled execution-preparation predicate program.
266    #[must_use]
267    pub(in crate::db) const fn execution_preparation_compiled_predicate(
268        &self,
269    ) -> Option<&PredicateProgram> {
270        self.static_execution_planning_contract()
271            .execution_preparation_compiled_predicate
272            .as_ref()
273    }
274
275    /// Borrow the planner-compiled effective runtime predicate program.
276    #[must_use]
277    pub(in crate::db) const fn effective_runtime_compiled_predicate(
278        &self,
279    ) -> Option<&PredicateProgram> {
280        match self
281            .static_execution_planning_contract()
282            .residual_filter_contract
283            .effective_runtime_filter_program()
284        {
285            Some(program) => program.predicate_program(),
286            None => None,
287        }
288    }
289
290    /// Borrow the planner-compiled effective runtime scalar filter expression.
291    #[cfg(test)]
292    #[must_use]
293    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
294        &self,
295    ) -> Option<&CompiledExpr> {
296        match self
297            .static_execution_planning_contract()
298            .residual_filter_contract
299            .effective_runtime_filter_program()
300        {
301            Some(program) => program.expression_filter(),
302            None => None,
303        }
304    }
305
306    /// Borrow the planner-frozen effective runtime scalar filter program.
307    #[must_use]
308    pub(in crate::db) const fn effective_runtime_filter_program(
309        &self,
310    ) -> Option<&EffectiveRuntimeFilterProgram> {
311        self.static_execution_planning_contract()
312            .residual_filter_contract
313            .effective_runtime_filter_program()
314    }
315
316    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
317    #[must_use]
318    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
319        if !self.scalar_plan().distinct {
320            return DistinctExecutionStrategy::None;
321        }
322
323        // DISTINCT on duplicate-safe single-path access shapes is a planner
324        // no-op for runtime dedup mechanics. Composite shapes can surface
325        // duplicate keys and therefore retain explicit dedup execution.
326        match distinct_runtime_dedup_strategy(&self.access) {
327            Some(strategy) => strategy,
328            None => DistinctExecutionStrategy::None,
329        }
330    }
331
332    /// Freeze one planner-owned route profile after model validation completes.
333    #[cfg(test)]
334    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
335        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
336    }
337
338    /// Freeze one planner-owned route profile from accepted schema authority.
339    pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
340        &mut self,
341        schema_info: &SchemaInfo,
342    ) {
343        self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
344    }
345
346    /// Freeze model-only executor metadata after logical/access planning completes.
347    #[cfg(test)]
348    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
349        &mut self,
350        model: &EntityModel,
351    ) -> Result<(), InternalError> {
352        self.finalize_static_execution_planning_contract_for_model_with_schema(
353            model,
354            SchemaInfo::cached_for_generated_entity_model(model),
355        )
356    }
357
358    /// Freeze planner-owned executor metadata with explicit schema authority.
359    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
360        &mut self,
361        model: &EntityModel,
362        schema_info: &SchemaInfo,
363    ) -> Result<(), InternalError> {
364        self.static_execution_planning_contract = Some(
365            project_static_execution_planning_contract_for_model(model, schema_info, self)?,
366        );
367
368        Ok(())
369    }
370
371    /// Build one immutable execution-shape signature contract for runtime layers.
372    #[must_use]
373    pub(in crate::db) fn execution_shape_signature(
374        &self,
375        entity_path: &'static str,
376    ) -> ExecutionShapeSignature {
377        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
378    }
379
380    /// Return whether the chosen access contract fully satisfies the current
381    /// scalar query predicate without any additional post-access filtering.
382    #[must_use]
383    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
384        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
385            return self.scalar_plan().predicate.is_some()
386                && !static_contract
387                    .residual_filter_contract
388                    .has_residual_filter();
389        }
390
391        derive_predicate_fully_satisfied_by_access_contract(self)
392    }
393
394    /// Borrow the planner-frozen compiled scalar projection program.
395    #[must_use]
396    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
397        self.static_execution_planning_contract()
398            .scalar_projection_plan
399            .as_deref()
400    }
401
402    /// Return whether planner-owned static execution metadata has already been frozen.
403    #[must_use]
404    pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
405        self.static_execution_planning_contract.is_some()
406    }
407
408    /// Borrow the planner-frozen ordered primary-key field names.
409    #[must_use]
410    pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
411        self.static_execution_planning_contract()
412            .primary_key_names
413            .iter()
414            .map(String::as_str)
415            .collect()
416    }
417
418    /// Borrow the planner-frozen projection slot reachability set.
419    #[must_use]
420    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
421        self.static_execution_planning_contract()
422            .projection_referenced_slots
423            .as_slice()
424    }
425
426    /// Borrow the planner-frozen mask for direct projected output slots.
427    #[must_use]
428    #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
429    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
430        self.static_execution_planning_contract()
431            .projected_slot_mask
432            .as_slice()
433    }
434
435    /// Return whether projection remains the full model-identity field list.
436    #[must_use]
437    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
438        self.static_execution_planning_contract()
439            .projection_is_model_identity
440    }
441
442    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
443    #[must_use]
444    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
445        self.static_execution_planning_contract()
446            .order_referenced_slots
447            .as_deref()
448    }
449
450    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
451    #[must_use]
452    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
453        self.static_execution_planning_contract()
454            .resolved_order
455            .as_ref()
456    }
457
458    /// Borrow the planner-frozen access slot map used by index predicate compilation.
459    #[must_use]
460    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
461        self.static_execution_planning_contract()
462            .slot_map
463            .as_deref()
464    }
465
466    /// Borrow grouped aggregate execution specs already resolved during static planning.
467    #[must_use]
468    pub(in crate::db) fn grouped_aggregate_execution_specs(
469        &self,
470    ) -> Option<&[GroupedAggregateExecutionSpec]> {
471        self.static_execution_planning_contract()
472            .grouped_aggregate_execution_specs
473            .as_deref()
474    }
475
476    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
477    #[must_use]
478    pub(in crate::db) const fn grouped_distinct_execution_strategy(
479        &self,
480    ) -> Option<&GroupedDistinctExecutionStrategy> {
481        self.static_execution_planning_contract()
482            .grouped_distinct_execution_strategy
483            .as_ref()
484    }
485
486    /// Borrow the frozen projection semantic shape without reopening model ownership.
487    #[must_use]
488    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
489        &self.static_execution_planning_contract().projection_spec
490    }
491
492    /// Borrow the frozen direct projection slots without reopening model ownership.
493    #[must_use]
494    #[cfg(any(test, feature = "sql"))]
495    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
496        self.static_execution_planning_contract()
497            .projection_direct_slots
498            .as_deref()
499    }
500
501    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
502    #[must_use]
503    #[cfg(any(test, feature = "sql"))]
504    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
505        self.static_execution_planning_contract()
506            .projection_data_row_direct_slots
507            .as_deref()
508    }
509
510    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
511    #[must_use]
512    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
513        self.static_execution_planning_contract()
514            .index_compile_targets
515            .as_deref()
516    }
517
518    const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
519        self.static_execution_planning_contract
520            .as_ref()
521            .expect("query semantics invariant")
522    }
523}
524
525fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
526    match access {
527        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
528            Some(DistinctExecutionStrategy::PreOrdered)
529        }
530        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
531            Some(DistinctExecutionStrategy::HashMaterialize)
532        }
533        AccessPlan::Path(_) => None,
534    }
535}
536
537fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
538    let is_grouped_safe = plan
539        .grouped_plan()
540        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
541
542    ContinuationPolicy::new(
543        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
544        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
545        is_grouped_safe,
546    )
547}
548
549/// Project one planner-owned route profile from the finalized logical+access plan.
550#[must_use]
551#[cfg(test)]
552pub(in crate::db) fn project_planner_route_profile_for_model(
553    model: &EntityModel,
554    plan: &AccessPlannedQuery,
555) -> PlannerRouteProfile {
556    let primary_key_names = ordered_primary_key_names(model);
557    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
558        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
559    });
560
561    PlannerRouteProfile::new(
562        derive_continuation_policy_validated(plan),
563        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
564        secondary_order_contract,
565    )
566}
567
568/// Project one planner-owned route profile from accepted schema authority.
569#[must_use]
570pub(in crate::db) fn project_planner_route_profile_for_schema(
571    schema_info: &SchemaInfo,
572    plan: &AccessPlannedQuery,
573) -> PlannerRouteProfile {
574    let primary_key_names = primary_key_names_from_schema(schema_info);
575    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
576        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
577    });
578
579    PlannerRouteProfile::new(
580        derive_continuation_policy_validated(plan),
581        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
582        secondary_order_contract,
583    )
584}
585
586fn project_static_execution_planning_contract_for_model(
587    model: &EntityModel,
588    schema_info: &SchemaInfo,
589    plan: &AccessPlannedQuery,
590) -> Result<StaticExecutionPlanningContract, InternalError> {
591    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
592    let execution_preparation_predicate = plan.execution_preparation_predicate();
593    let residual_filter_predicate = derive_residual_filter_predicate(plan);
594    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
595    let execution_preparation_compiled_predicate =
596        compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
597    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
598        schema_info,
599        residual_filter_expr.as_ref(),
600        residual_filter_predicate.as_ref(),
601    )?;
602    let residual_filter_contract = ResidualFilterContract::new(
603        residual_filter_expr,
604        residual_filter_predicate,
605        effective_runtime_filter_program,
606    );
607    let predicate_pushdown_diagnostics = PredicatePushdownDiagnostics::from_plan(
608        plan.scalar_plan().filter_expr.is_some(),
609        plan.scalar_plan().predicate_covers_filter_expr,
610        plan.scalar_plan().predicate.as_ref(),
611        &plan.access,
612        residual_filter_contract.shape(),
613    );
614    let scalar_projection_plan = if plan.grouped_plan().is_none() {
615        Some(
616            compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
617                .ok_or_else(InternalError::query_executor_invariant)?
618                .iter()
619                .map(CompiledExpr::compile)
620                .collect(),
621        )
622    } else {
623        None
624    };
625    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
626        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
627    let projection_direct_slots = lower_direct_projection_slots_with_schema(
628        model,
629        schema_info,
630        &plan.logical,
631        &plan.projection_selection,
632    );
633    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
634        model,
635        schema_info,
636        &plan.logical,
637        &plan.projection_selection,
638    );
639    let projection_referenced_slots =
640        projection_spec.referenced_slots_for_schema(model, schema_info)?;
641    let projected_slot_mask =
642        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
643    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
644    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
645    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
646    let slot_map = slot_map_for_schema_plan(schema_info, plan);
647    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
648
649    Ok(StaticExecutionPlanningContract {
650        primary_key_names: schema_info.primary_key_names().to_vec(),
651        projection_spec,
652        execution_preparation_predicate,
653        execution_preparation_compiled_predicate,
654        residual_filter_contract,
655        predicate_pushdown_diagnostics,
656        scalar_projection_plan,
657        grouped_aggregate_execution_specs,
658        grouped_distinct_execution_strategy,
659        projection_direct_slots,
660        projection_data_row_direct_slots,
661        projection_referenced_slots,
662        projected_slot_mask,
663        projection_is_model_identity,
664        resolved_order,
665        order_referenced_slots,
666        slot_map,
667        index_compile_targets,
668    })
669}
670
671#[cfg(test)]
672fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
673    model.primary_key_names()
674}
675
676fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
677    schema_info
678        .primary_key_names()
679        .iter()
680        .map(String::as_str)
681        .collect()
682}
683
684// Compile the executor-owned residual scalar filter contract once from the
685// planner-derived residual artifacts so runtime never has to rediscover
686// residual presence or shape from semantic/filter/pushdown state.
687fn compile_effective_runtime_filter_program(
688    schema_info: &SchemaInfo,
689    residual_filter_expr: Option<&Expr>,
690    residual_filter_predicate: Option<&Predicate>,
691) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
692    // Keep the existing predicate fast path when the residual semantics still
693    // fit the derived predicate contract. The expression-owned lane is only
694    // needed once pushdown loses semantic coverage and a residual predicate no
695    // longer exists.
696    if let Some(predicate) = residual_filter_predicate {
697        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
698            PredicateProgram::compile_with_schema_info(schema_info, predicate),
699        )));
700    }
701
702    if let Some(filter_expr) = residual_filter_expr {
703        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
704            .ok_or_else(InternalError::query_invalid_logical_plan)?;
705
706        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
707            CompiledExpr::compile(&compiled),
708        )));
709    }
710
711    Ok(None)
712}
713
714// Derive the executor-preparation predicate once from the selected access path.
715// This strips only filtered-index guard clauses while preserving access-bound
716// equalities that still matter to preparation/explain consumers.
717fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
718    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
719
720    match plan.access.selected_index_contract() {
721        Some(index) => {
722            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
723        }
724        None => Some(query_predicate.clone()),
725    }
726}
727
728// Derive the final residual predicate once from the already-filtered
729// preparation predicate plus any equality bounds guaranteed by the concrete
730// access path.
731fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
732    let filtered_residual = derive_execution_preparation_predicate(plan);
733    let filtered_residual = filtered_residual.as_ref()?;
734
735    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
736}
737
738// Derive the explicit residual semantic expression once for finalized plans.
739// The residual expression remains the planner-owned semantic filter when any
740// runtime filtering still survives access satisfaction.
741fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
742    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
743    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
744        return None;
745    }
746
747    Some(filter_expr.clone())
748}
749
750// Derive the explicit residual semantic expression during finalization using
751// the trusted entity schema so compare-family literal normalization matches the
752// planner-owned predicate contract before residual ownership is decided.
753fn derive_residual_filter_expr_for_model(
754    model: &EntityModel,
755    plan: &AccessPlannedQuery,
756) -> Option<Expr> {
757    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
758    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
759        return None;
760    }
761
762    Some(filter_expr.clone())
763}
764
765// Return whether any residual filtering survives after access planning. This
766// helper exists only for pre-finalization assembly; finalized plans must read
767// the explicit residual artifacts frozen in `StaticExecutionPlanningContract`.
768fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
769    match (
770        plan.scalar_plan().filter_expr.as_ref(),
771        plan.scalar_plan().predicate.as_ref(),
772    ) {
773        (None, None) => false,
774        (Some(_), None) => true,
775        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
776    }
777}
778
779// Return true when the planner-owned predicate contract is fully satisfied by
780// access planning and no semantic residual filter expression survives.
781fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
782    plan.scalar_plan().predicate.is_some()
783        && derive_residual_filter_predicate(plan).is_none()
784        && derive_residual_filter_expr(plan).is_none()
785}
786
787// Return true when the semantic filter expression is entirely represented by
788// the planner-owned predicate contract and the chosen access path satisfies
789// that predicate without any runtime remainder.
790const fn derive_semantic_filter_fully_satisfied_by_access_contract(
791    plan: &AccessPlannedQuery,
792) -> bool {
793    plan.scalar_plan().filter_expr.is_some()
794        && plan.scalar_plan().predicate.is_some()
795        && plan.scalar_plan().predicate_covers_filter_expr
796}
797
798// Return true when finalized planning can prove that the semantic filter
799// expression is completely represented by the planner-owned predicate contract
800// after aligning compare literals through the trusted entity schema.
801const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
802    _model: &EntityModel,
803    plan: &AccessPlannedQuery,
804) -> bool {
805    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
806}
807
808// Compile one optional planner-frozen predicate program while keeping the
809// static planning assembly path free of repeated `Option` mapping boilerplate.
810fn compile_optional_predicate(
811    schema_info: &SchemaInfo,
812    predicate: Option<&Predicate>,
813) -> Option<PredicateProgram> {
814    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
815}
816
817// Resolve the grouped-only static planning semantics bundle once so grouped
818// aggregate execution specs and grouped DISTINCT strategy stay derived under
819// one shared grouped-plan branch.
820fn resolve_grouped_static_planning_semantics(
821    schema_info: &SchemaInfo,
822    plan: &AccessPlannedQuery,
823    projection_spec: &ProjectionSpec,
824) -> Result<
825    (
826        Option<Vec<GroupedAggregateExecutionSpec>>,
827        Option<GroupedDistinctExecutionStrategy>,
828    ),
829    InternalError,
830> {
831    let Some(grouped) = plan.grouped_plan() else {
832        return Ok((None, None));
833    };
834
835    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
836        projection_spec,
837        grouped.group.group_fields.as_slice(),
838        grouped.group.aggregates.as_slice(),
839    )?;
840    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
841
842    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
843        schema_info,
844        aggregate_specs.as_slice(),
845    )?);
846    let grouped_distinct_execution_strategy = Some(
847        resolved_grouped_distinct_execution_strategy_with_schema_info(
848            schema_info,
849            grouped.group.group_fields.as_slice(),
850            grouped.group.aggregates.as_slice(),
851            grouped.having_expr.as_ref(),
852        )?,
853    );
854
855    Ok((
856        grouped_aggregate_execution_specs,
857        grouped_distinct_execution_strategy,
858    ))
859}
860
861fn extend_grouped_having_aggregate_specs(
862    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
863    grouped: &GroupPlan,
864) -> Result<(), InternalError> {
865    if let Some(having_expr) = grouped.having_expr.as_ref() {
866        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
867    }
868
869    Ok(())
870}
871
872fn collect_grouped_having_expr_aggregate_specs(
873    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
874    expr: &Expr,
875) -> Result<(), InternalError> {
876    if !expr.contains_aggregate() {
877        return Ok(());
878    }
879
880    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
881        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
882
883        if aggregate_specs
884            .iter()
885            .all(|current| current != &aggregate_spec)
886        {
887            aggregate_specs.push(aggregate_spec);
888        }
889
890        Ok(())
891    })
892}
893
894fn projected_slot_mask_for_spec(
895    model: &EntityModel,
896    direct_projection_slots: Option<&[usize]>,
897) -> Vec<bool> {
898    let schema_slot_len = direct_projection_slots
899        .and_then(|slots| slots.iter().copied().max())
900        .map_or(0, |slot| slot.saturating_add(1));
901    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
902
903    let Some(direct_projection_slots) = direct_projection_slots else {
904        return projected_slots;
905    };
906
907    for slot in direct_projection_slots.iter().copied() {
908        if let Some(projected) = projected_slots.get_mut(slot) {
909            *projected = true;
910        }
911    }
912
913    projected_slots
914}
915
916fn resolved_order_for_plan(
917    schema_info: &SchemaInfo,
918    plan: &AccessPlannedQuery,
919) -> Result<Option<ResolvedOrder>, InternalError> {
920    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
921        return Ok(None);
922    }
923
924    let Some(order) = plan.scalar_plan().order.as_ref() else {
925        return Ok(None);
926    };
927
928    let mut fields = Vec::with_capacity(order.fields.len());
929    for term in &order.fields {
930        fields.push(ResolvedOrderField::new(
931            resolved_order_value_source_for_term(schema_info, term)?,
932            term.direction(),
933        ));
934    }
935
936    Ok(Some(ResolvedOrder::new(fields)))
937}
938
939fn resolved_order_value_source_for_term(
940    schema_info: &SchemaInfo,
941    term: &crate::db::query::plan::OrderTerm,
942) -> Result<ResolvedOrderValueSource, InternalError> {
943    if term.direct_field().is_none() {
944        let rendered = term.rendered_label();
945        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
946        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
947            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
948
949        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
950            &compiled,
951        )));
952    }
953
954    let field = term.direct_field().expect("query semantics invariant");
955    let slot = resolve_required_schema_slot(
956        schema_info,
957        field,
958        InternalError::query_invalid_logical_plan,
959    )?;
960
961    Ok(ResolvedOrderValueSource::direct_field(slot))
962}
963
964fn validate_resolved_order_expr_fields(
965    schema_info: &SchemaInfo,
966    expr: &Expr,
967    rendered: &str,
968) -> Result<(), InternalError> {
969    expr.try_for_each_tree_expr(&mut |node| match node {
970        Expr::Field(field_id) => resolve_required_schema_slot(
971            schema_info,
972            field_id.as_str(),
973            InternalError::query_invalid_logical_plan,
974        )
975        .map(|_| ()),
976        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
977        #[cfg(test)]
978        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
979        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
980        _ => Ok(()),
981    })
982}
983
984// Resolve one schema-authoritative field slot while keeping planner
985// invalid-logical-plan error construction at the callsite that owns the
986// diagnostic wording.
987fn resolve_required_schema_slot<F>(
988    schema_info: &SchemaInfo,
989    field: &str,
990    invalid_plan_error: F,
991) -> Result<usize, InternalError>
992where
993    F: FnOnce() -> InternalError,
994{
995    schema_info
996        .field_slot_index(field)
997        .ok_or_else(invalid_plan_error)
998}
999
1000// Keep the scalar-order expression seam violation text under one helper so the
1001// parse validation and compile validation paths do not drift.
1002fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
1003    InternalError::query_invalid_logical_plan()
1004}
1005
1006// Keep one stable executor-facing slot list for grouped order terms after the
1007// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
1008// now consumes this same referenced-slot contract instead of re-deriving order
1009// sources from planner strategy at runtime.
1010fn order_referenced_slots_for_resolved_order(
1011    resolved_order: Option<&ResolvedOrder>,
1012) -> Option<Vec<usize>> {
1013    Some(resolved_order?.referenced_slots())
1014}
1015
1016fn slot_map_for_schema_plan(
1017    schema_info: &SchemaInfo,
1018    plan: &AccessPlannedQuery,
1019) -> Option<Vec<usize>> {
1020    let executable = plan.access.executable_contract();
1021
1022    resolved_index_slots_for_access_path(schema_info, &executable)
1023}
1024
1025fn resolved_index_slots_for_access_path(
1026    schema_info: &SchemaInfo,
1027    access: &ExecutableAccessPlan<'_, crate::value::Value>,
1028) -> Option<Vec<usize>> {
1029    let path = access.as_path()?;
1030    let path_facts = path.shape_facts();
1031    let key_items = path_facts.index_key_items_for_slot_map()?;
1032    let mut slots = Vec::new();
1033
1034    match key_items.key_items() {
1035        SemanticIndexKeyItemsRef::Fields(fields) => {
1036            slots.reserve(fields.len());
1037            for field_name in fields {
1038                let slot = schema_info.field_slot_index(field_name)?;
1039                slots.push(slot);
1040            }
1041        }
1042        SemanticIndexKeyItemsRef::Accepted(items) => {
1043            slots.reserve(items.len());
1044            for key_item in items {
1045                let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1046                slots.push(slot);
1047            }
1048        }
1049        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1050            slots.reserve(fields.len());
1051            for &field_name in fields {
1052                let slot = schema_info.field_slot_index(field_name)?;
1053                slots.push(slot);
1054            }
1055        }
1056        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1057            slots.reserve(items.len());
1058            for key_item in items {
1059                let slot = schema_info.field_slot_index(key_item.field())?;
1060                slots.push(slot);
1061            }
1062        }
1063    }
1064
1065    Some(slots)
1066}
1067
1068fn index_compile_targets_for_schema_plan(
1069    schema_info: &SchemaInfo,
1070    plan: &AccessPlannedQuery,
1071) -> Option<Vec<IndexCompileTarget>> {
1072    let executable = plan.access.executable_contract();
1073    let path = executable.as_path()?;
1074    let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1075    let mut targets = Vec::new();
1076
1077    match key_items.key_items() {
1078        SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1079            return None;
1080        }
1081        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1082            for (component_index, &field_name) in fields.iter().enumerate() {
1083                let field_slot = schema_info.field_slot_index(field_name)?;
1084                targets.push(IndexCompileTarget {
1085                    component_index,
1086                    field_slot,
1087                    key_item: IndexKeyItem::Field(field_name),
1088                });
1089            }
1090        }
1091        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1092            for (component_index, &key_item) in items.iter().enumerate() {
1093                let field_slot = schema_info.field_slot_index(key_item.field())?;
1094                targets.push(IndexCompileTarget {
1095                    component_index,
1096                    field_slot,
1097                    key_item,
1098                });
1099            }
1100        }
1101    }
1102
1103    Some(targets)
1104}