Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9    db::{
10        access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11        predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12        query::plan::{
13            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
17            ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
18            derive_logical_pushdown_eligibility,
19            expr::{
20                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
21                compile_scalar_projection_plan_with_schema,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy,
25            lower_data_row_direct_projection_slots_with_schema,
26            lower_direct_projection_slots_with_schema, lower_projection_identity,
27            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
28            residual_query_predicate_after_filtered_access_contract,
29            resolved_grouped_distinct_execution_strategy_with_schema_info,
30        },
31        schema::SchemaInfo,
32    },
33    error::InternalError,
34    model::{
35        entity::EntityModel,
36        index::{IndexKeyItem, IndexKeyItemsRef},
37    },
38};
39
40impl QueryMode {
41    /// True if this mode represents a load intent.
42    #[must_use]
43    pub const fn is_load(&self) -> bool {
44        match self {
45            Self::Load(_) => true,
46            Self::Delete(_) => false,
47        }
48    }
49
50    /// True if this mode represents a delete intent.
51    #[must_use]
52    pub const fn is_delete(&self) -> bool {
53        match self {
54            Self::Delete(_) => true,
55            Self::Load(_) => false,
56        }
57    }
58}
59
60impl LogicalPlan {
61    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
62    #[must_use]
63    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
64        match self {
65            Self::Scalar(plan) => plan,
66            Self::Grouped(plan) => &plan.scalar,
67        }
68    }
69
70    /// Borrow scalar semantic fields mutably across logical variants for tests.
71    #[must_use]
72    #[cfg(test)]
73    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
74        match self {
75            Self::Scalar(plan) => plan,
76            Self::Grouped(plan) => &mut plan.scalar,
77        }
78    }
79
80    /// Test-only shorthand for explicit scalar semantic borrowing.
81    #[must_use]
82    #[cfg(test)]
83    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
84        self.scalar_semantics()
85    }
86
87    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
88    #[must_use]
89    #[cfg(test)]
90    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
91        self.scalar_semantics_mut()
92    }
93}
94
95impl AccessPlannedQuery {
96    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
97    #[must_use]
98    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
99        self.logical.scalar_semantics()
100    }
101
102    /// Borrow scalar missing-row consistency without exposing the full scalar
103    /// plan to executor owners that only need row-presence policy.
104    #[must_use]
105    #[cfg(any(test, feature = "sql"))]
106    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
107        self.scalar_plan().consistency
108    }
109
110    /// Borrow scalar semantic fields mutably across logical variants for tests.
111    #[must_use]
112    #[cfg(test)]
113    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
114        self.logical.scalar_semantics_mut()
115    }
116
117    /// Test-only shorthand for explicit scalar plan borrowing.
118    #[must_use]
119    #[cfg(test)]
120    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
121        self.scalar_plan()
122    }
123
124    /// Test-only shorthand for explicit mutable scalar plan borrowing.
125    #[must_use]
126    #[cfg(test)]
127    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
128        self.scalar_plan_mut()
129    }
130
131    /// Borrow grouped semantic fields when this plan is grouped.
132    #[must_use]
133    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
134        match &self.logical {
135            LogicalPlan::Scalar(_) => None,
136            LogicalPlan::Grouped(plan) => Some(plan),
137        }
138    }
139
140    /// Lower this plan into one canonical planner-owned projection semantic spec.
141    #[must_use]
142    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
143        if let Some(static_contract) = &self.static_execution_planning_contract {
144            return static_contract.projection_spec.clone();
145        }
146
147        lower_projection_intent(model, &self.logical, &self.projection_selection)
148    }
149
150    /// Lower this plan into one projection semantic shape for identity hashing.
151    #[must_use]
152    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
153        lower_projection_identity(&self.logical, &self.projection_selection)
154    }
155
156    /// Return the executor-facing predicate after removing only filtered-index
157    /// guard clauses the chosen access path already proves.
158    ///
159    /// This conservative form is used by preparation/explain surfaces that
160    /// still need to see access-bound equalities as index-predicate input.
161    #[must_use]
162    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
163        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
164            return static_contract.execution_preparation_predicate.clone();
165        }
166
167        derive_execution_preparation_predicate(self)
168    }
169
170    /// Return the executor-facing residual predicate after removing any
171    /// filtered-index guard clauses and fixed access-bound equalities already
172    /// guaranteed by the chosen path.
173    #[must_use]
174    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
175        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
176            return static_contract.residual_filter_predicate.clone();
177        }
178
179        derive_residual_filter_predicate(self)
180    }
181
182    /// Return whether one explicit residual predicate survives access
183    /// planning and still participates in residual execution.
184    #[must_use]
185    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
186        self.effective_execution_predicate().is_some()
187    }
188
189    /// Borrow the planner-owned residual scalar filter expression when one
190    /// surviving semantic remainder still requires runtime evaluation.
191    #[must_use]
192    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
193        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
194            return static_contract.residual_filter_expr.as_ref();
195        }
196
197        if !derive_has_residual_filter(self) {
198            return None;
199        }
200
201        self.scalar_plan().filter_expr.as_ref()
202    }
203
204    /// Return whether one explicit residual scalar filter expression survives
205    /// access planning and still requires runtime evaluation.
206    #[must_use]
207    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
208        self.residual_filter_expr().is_some()
209    }
210
211    /// Borrow the planner-compiled execution-preparation predicate program.
212    #[must_use]
213    pub(in crate::db) const fn execution_preparation_compiled_predicate(
214        &self,
215    ) -> Option<&PredicateProgram> {
216        self.static_execution_planning_contract()
217            .execution_preparation_compiled_predicate
218            .as_ref()
219    }
220
221    /// Borrow the planner-compiled effective runtime predicate program.
222    #[must_use]
223    pub(in crate::db) const fn effective_runtime_compiled_predicate(
224        &self,
225    ) -> Option<&PredicateProgram> {
226        match self
227            .static_execution_planning_contract()
228            .effective_runtime_filter_program
229            .as_ref()
230        {
231            Some(program) => program.predicate_program(),
232            None => None,
233        }
234    }
235
236    /// Borrow the planner-compiled effective runtime scalar filter expression.
237    #[cfg(test)]
238    #[must_use]
239    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
240        &self,
241    ) -> Option<&CompiledExpr> {
242        match self
243            .static_execution_planning_contract()
244            .effective_runtime_filter_program
245            .as_ref()
246        {
247            Some(program) => program.expression_filter(),
248            None => None,
249        }
250    }
251
252    /// Borrow the planner-frozen effective runtime scalar filter program.
253    #[must_use]
254    pub(in crate::db) const fn effective_runtime_filter_program(
255        &self,
256    ) -> Option<&EffectiveRuntimeFilterProgram> {
257        self.static_execution_planning_contract()
258            .effective_runtime_filter_program
259            .as_ref()
260    }
261
262    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
263    #[must_use]
264    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
265        if !self.scalar_plan().distinct {
266            return DistinctExecutionStrategy::None;
267        }
268
269        // DISTINCT on duplicate-safe single-path access shapes is a planner
270        // no-op for runtime dedup mechanics. Composite shapes can surface
271        // duplicate keys and therefore retain explicit dedup execution.
272        match distinct_runtime_dedup_strategy(&self.access) {
273            Some(strategy) => strategy,
274            None => DistinctExecutionStrategy::None,
275        }
276    }
277
278    /// Freeze one planner-owned route profile after model validation completes.
279    #[cfg(test)]
280    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
281        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
282    }
283
284    /// Freeze one planner-owned route profile from accepted schema authority.
285    pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
286        &mut self,
287        schema_info: &SchemaInfo,
288    ) {
289        self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
290    }
291
292    /// Freeze model-only executor metadata after logical/access planning completes.
293    #[cfg(test)]
294    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
295        &mut self,
296        model: &EntityModel,
297    ) -> Result<(), InternalError> {
298        self.finalize_static_execution_planning_contract_for_model_with_schema(
299            model,
300            SchemaInfo::cached_for_generated_entity_model(model),
301        )
302    }
303
304    /// Freeze planner-owned executor metadata with explicit schema authority.
305    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
306        &mut self,
307        model: &EntityModel,
308        schema_info: &SchemaInfo,
309    ) -> Result<(), InternalError> {
310        self.static_execution_planning_contract = Some(
311            project_static_execution_planning_contract_for_model(model, schema_info, self)?,
312        );
313
314        Ok(())
315    }
316
317    /// Build one immutable execution-shape signature contract for runtime layers.
318    #[must_use]
319    pub(in crate::db) fn execution_shape_signature(
320        &self,
321        entity_path: &'static str,
322    ) -> ExecutionShapeSignature {
323        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
324    }
325
326    /// Return whether the chosen access contract fully satisfies the current
327    /// scalar query predicate without any additional post-access filtering.
328    #[must_use]
329    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
330        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
331            return self.scalar_plan().predicate.is_some()
332                && static_contract.residual_filter_predicate.is_none()
333                && static_contract.residual_filter_expr.is_none();
334        }
335
336        derive_predicate_fully_satisfied_by_access_contract(self)
337    }
338
339    /// Borrow the planner-frozen compiled scalar projection program.
340    #[must_use]
341    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
342        self.static_execution_planning_contract()
343            .scalar_projection_plan
344            .as_deref()
345    }
346
347    /// Return whether planner-owned static execution metadata has already been frozen.
348    #[must_use]
349    pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
350        self.static_execution_planning_contract.is_some()
351    }
352
353    /// Borrow the planner-frozen ordered primary-key field names.
354    #[must_use]
355    pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
356        self.static_execution_planning_contract()
357            .primary_key_names
358            .iter()
359            .map(String::as_str)
360            .collect()
361    }
362
363    /// Borrow the planner-frozen projection slot reachability set.
364    #[must_use]
365    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
366        self.static_execution_planning_contract()
367            .projection_referenced_slots
368            .as_slice()
369    }
370
371    /// Borrow the planner-frozen mask for direct projected output slots.
372    #[must_use]
373    #[cfg(any(test, feature = "diagnostics"))]
374    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
375        self.static_execution_planning_contract()
376            .projected_slot_mask
377            .as_slice()
378    }
379
380    /// Return whether projection remains the full model-identity field list.
381    #[must_use]
382    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
383        self.static_execution_planning_contract()
384            .projection_is_model_identity
385    }
386
387    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
388    #[must_use]
389    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
390        self.static_execution_planning_contract()
391            .order_referenced_slots
392            .as_deref()
393    }
394
395    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
396    #[must_use]
397    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
398        self.static_execution_planning_contract()
399            .resolved_order
400            .as_ref()
401    }
402
403    /// Borrow the planner-frozen access slot map used by index predicate compilation.
404    #[must_use]
405    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
406        self.static_execution_planning_contract()
407            .slot_map
408            .as_deref()
409    }
410
411    /// Borrow grouped aggregate execution specs already resolved during static planning.
412    #[must_use]
413    pub(in crate::db) fn grouped_aggregate_execution_specs(
414        &self,
415    ) -> Option<&[GroupedAggregateExecutionSpec]> {
416        self.static_execution_planning_contract()
417            .grouped_aggregate_execution_specs
418            .as_deref()
419    }
420
421    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
422    #[must_use]
423    pub(in crate::db) const fn grouped_distinct_execution_strategy(
424        &self,
425    ) -> Option<&GroupedDistinctExecutionStrategy> {
426        self.static_execution_planning_contract()
427            .grouped_distinct_execution_strategy
428            .as_ref()
429    }
430
431    /// Borrow the frozen projection semantic shape without reopening model ownership.
432    #[must_use]
433    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
434        &self.static_execution_planning_contract().projection_spec
435    }
436
437    /// Borrow the frozen direct projection slots without reopening model ownership.
438    #[must_use]
439    #[cfg(any(test, feature = "sql"))]
440    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
441        self.static_execution_planning_contract()
442            .projection_direct_slots
443            .as_deref()
444    }
445
446    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
447    #[must_use]
448    #[cfg(any(test, feature = "sql"))]
449    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
450        self.static_execution_planning_contract()
451            .projection_data_row_direct_slots
452            .as_deref()
453    }
454
455    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
456    #[must_use]
457    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
458        self.static_execution_planning_contract()
459            .index_compile_targets
460            .as_deref()
461    }
462
463    const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
464        self.static_execution_planning_contract
465            .as_ref()
466            .expect("query semantics invariant")
467    }
468}
469
470fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
471    match access {
472        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
473            Some(DistinctExecutionStrategy::PreOrdered)
474        }
475        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
476            Some(DistinctExecutionStrategy::HashMaterialize)
477        }
478        AccessPlan::Path(_) => None,
479    }
480}
481
482fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
483    let is_grouped_safe = plan
484        .grouped_plan()
485        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
486
487    ContinuationPolicy::new(
488        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
489        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
490        is_grouped_safe,
491    )
492}
493
494/// Project one planner-owned route profile from the finalized logical+access plan.
495#[must_use]
496#[cfg(test)]
497pub(in crate::db) fn project_planner_route_profile_for_model(
498    model: &EntityModel,
499    plan: &AccessPlannedQuery,
500) -> PlannerRouteProfile {
501    let primary_key_names = ordered_primary_key_names(model);
502    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
503        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
504    });
505
506    PlannerRouteProfile::new(
507        derive_continuation_policy_validated(plan),
508        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
509        secondary_order_contract,
510    )
511}
512
513/// Project one planner-owned route profile from accepted schema authority.
514#[must_use]
515pub(in crate::db) fn project_planner_route_profile_for_schema(
516    schema_info: &SchemaInfo,
517    plan: &AccessPlannedQuery,
518) -> PlannerRouteProfile {
519    let primary_key_names = primary_key_names_from_schema(schema_info);
520    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
521        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
522    });
523
524    PlannerRouteProfile::new(
525        derive_continuation_policy_validated(plan),
526        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
527        secondary_order_contract,
528    )
529}
530
531fn project_static_execution_planning_contract_for_model(
532    model: &EntityModel,
533    schema_info: &SchemaInfo,
534    plan: &AccessPlannedQuery,
535) -> Result<StaticExecutionPlanningContract, InternalError> {
536    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
537    let execution_preparation_predicate = plan.execution_preparation_predicate();
538    let residual_filter_predicate = derive_residual_filter_predicate(plan);
539    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
540    let execution_preparation_compiled_predicate =
541        compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
542    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
543        schema_info,
544        residual_filter_expr.as_ref(),
545        residual_filter_predicate.as_ref(),
546    )?;
547    let scalar_projection_plan = if plan.grouped_plan().is_none() {
548        Some(
549            compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
550                .ok_or_else(InternalError::query_executor_invariant)?
551                .iter()
552                .map(CompiledExpr::compile)
553                .collect(),
554        )
555    } else {
556        None
557    };
558    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
559        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
560    let projection_direct_slots = lower_direct_projection_slots_with_schema(
561        model,
562        schema_info,
563        &plan.logical,
564        &plan.projection_selection,
565    );
566    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
567        model,
568        schema_info,
569        &plan.logical,
570        &plan.projection_selection,
571    );
572    let projection_referenced_slots =
573        projection_spec.referenced_slots_for_schema(model, schema_info)?;
574    let projected_slot_mask =
575        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
576    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
577    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
578    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
579    let slot_map = slot_map_for_schema_plan(schema_info, plan);
580    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
581
582    Ok(StaticExecutionPlanningContract {
583        primary_key_names: schema_info.primary_key_names().to_vec(),
584        projection_spec,
585        execution_preparation_predicate,
586        residual_filter_expr,
587        residual_filter_predicate,
588        execution_preparation_compiled_predicate,
589        effective_runtime_filter_program,
590        scalar_projection_plan,
591        grouped_aggregate_execution_specs,
592        grouped_distinct_execution_strategy,
593        projection_direct_slots,
594        projection_data_row_direct_slots,
595        projection_referenced_slots,
596        projected_slot_mask,
597        projection_is_model_identity,
598        resolved_order,
599        order_referenced_slots,
600        slot_map,
601        index_compile_targets,
602    })
603}
604
605#[cfg(test)]
606fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
607    model.primary_key_names()
608}
609
610fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
611    schema_info
612        .primary_key_names()
613        .iter()
614        .map(String::as_str)
615        .collect()
616}
617
618// Compile the executor-owned residual scalar filter contract once from the
619// planner-derived residual artifacts so runtime never has to rediscover
620// residual presence or shape from semantic/filter/pushdown state.
621fn compile_effective_runtime_filter_program(
622    schema_info: &SchemaInfo,
623    residual_filter_expr: Option<&Expr>,
624    residual_filter_predicate: Option<&Predicate>,
625) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
626    // Keep the existing predicate fast path when the residual semantics still
627    // fit the derived predicate contract. The expression-owned lane is only
628    // needed once pushdown loses semantic coverage and a residual predicate no
629    // longer exists.
630    if let Some(predicate) = residual_filter_predicate {
631        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
632            PredicateProgram::compile_with_schema_info(schema_info, predicate),
633        )));
634    }
635
636    if let Some(filter_expr) = residual_filter_expr {
637        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
638            .ok_or_else(InternalError::query_invalid_logical_plan)?;
639
640        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
641            CompiledExpr::compile(&compiled),
642        )));
643    }
644
645    Ok(None)
646}
647
648// Derive the executor-preparation predicate once from the selected access path.
649// This strips only filtered-index guard clauses while preserving access-bound
650// equalities that still matter to preparation/explain consumers.
651fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
652    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
653
654    match plan.access.selected_index_contract() {
655        Some(index) => {
656            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
657        }
658        None => Some(query_predicate.clone()),
659    }
660}
661
662// Derive the final residual predicate once from the already-filtered
663// preparation predicate plus any equality bounds guaranteed by the concrete
664// access path.
665fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
666    let filtered_residual = derive_execution_preparation_predicate(plan);
667    let filtered_residual = filtered_residual.as_ref()?;
668
669    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
670}
671
672// Derive the explicit residual semantic expression once for finalized plans.
673// The residual expression remains the planner-owned semantic filter when any
674// runtime filtering still survives access satisfaction.
675fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
676    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
677    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
678        return None;
679    }
680
681    Some(filter_expr.clone())
682}
683
684// Derive the explicit residual semantic expression during finalization using
685// the trusted entity schema so compare-family literal normalization matches the
686// planner-owned predicate contract before residual ownership is decided.
687fn derive_residual_filter_expr_for_model(
688    model: &EntityModel,
689    plan: &AccessPlannedQuery,
690) -> Option<Expr> {
691    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
692    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
693        return None;
694    }
695
696    Some(filter_expr.clone())
697}
698
699// Return whether any residual filtering survives after access planning. This
700// helper exists only for pre-finalization assembly; finalized plans must read
701// the explicit residual artifacts frozen in `StaticExecutionPlanningContract`.
702fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
703    match (
704        plan.scalar_plan().filter_expr.as_ref(),
705        plan.scalar_plan().predicate.as_ref(),
706    ) {
707        (None, None) => false,
708        (Some(_), None) => true,
709        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
710    }
711}
712
713// Return true when the planner-owned predicate contract is fully satisfied by
714// access planning and no semantic residual filter expression survives.
715fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
716    plan.scalar_plan().predicate.is_some()
717        && derive_residual_filter_predicate(plan).is_none()
718        && derive_residual_filter_expr(plan).is_none()
719}
720
721// Return true when the semantic filter expression is entirely represented by
722// the planner-owned predicate contract and the chosen access path satisfies
723// that predicate without any runtime remainder.
724const fn derive_semantic_filter_fully_satisfied_by_access_contract(
725    plan: &AccessPlannedQuery,
726) -> bool {
727    plan.scalar_plan().filter_expr.is_some()
728        && plan.scalar_plan().predicate.is_some()
729        && plan.scalar_plan().predicate_covers_filter_expr
730}
731
732// Return true when finalized planning can prove that the semantic filter
733// expression is completely represented by the planner-owned predicate contract
734// after aligning compare literals through the trusted entity schema.
735const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
736    _model: &EntityModel,
737    plan: &AccessPlannedQuery,
738) -> bool {
739    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
740}
741
742// Compile one optional planner-frozen predicate program while keeping the
743// static planning assembly path free of repeated `Option` mapping boilerplate.
744fn compile_optional_predicate(
745    schema_info: &SchemaInfo,
746    predicate: Option<&Predicate>,
747) -> Option<PredicateProgram> {
748    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
749}
750
751// Resolve the grouped-only static planning semantics bundle once so grouped
752// aggregate execution specs and grouped DISTINCT strategy stay derived under
753// one shared grouped-plan branch.
754fn resolve_grouped_static_planning_semantics(
755    schema_info: &SchemaInfo,
756    plan: &AccessPlannedQuery,
757    projection_spec: &ProjectionSpec,
758) -> Result<
759    (
760        Option<Vec<GroupedAggregateExecutionSpec>>,
761        Option<GroupedDistinctExecutionStrategy>,
762    ),
763    InternalError,
764> {
765    let Some(grouped) = plan.grouped_plan() else {
766        return Ok((None, None));
767    };
768
769    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
770        projection_spec,
771        grouped.group.group_fields.as_slice(),
772        grouped.group.aggregates.as_slice(),
773    )?;
774    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
775
776    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
777        schema_info,
778        aggregate_specs.as_slice(),
779    )?);
780    let grouped_distinct_execution_strategy = Some(
781        resolved_grouped_distinct_execution_strategy_with_schema_info(
782            schema_info,
783            grouped.group.group_fields.as_slice(),
784            grouped.group.aggregates.as_slice(),
785            grouped.having_expr.as_ref(),
786        )?,
787    );
788
789    Ok((
790        grouped_aggregate_execution_specs,
791        grouped_distinct_execution_strategy,
792    ))
793}
794
795fn extend_grouped_having_aggregate_specs(
796    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
797    grouped: &GroupPlan,
798) -> Result<(), InternalError> {
799    if let Some(having_expr) = grouped.having_expr.as_ref() {
800        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
801    }
802
803    Ok(())
804}
805
806fn collect_grouped_having_expr_aggregate_specs(
807    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
808    expr: &Expr,
809) -> Result<(), InternalError> {
810    if !expr.contains_aggregate() {
811        return Ok(());
812    }
813
814    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
815        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
816
817        if aggregate_specs
818            .iter()
819            .all(|current| current != &aggregate_spec)
820        {
821            aggregate_specs.push(aggregate_spec);
822        }
823
824        Ok(())
825    })
826}
827
828fn projected_slot_mask_for_spec(
829    model: &EntityModel,
830    direct_projection_slots: Option<&[usize]>,
831) -> Vec<bool> {
832    let schema_slot_len = direct_projection_slots
833        .and_then(|slots| slots.iter().copied().max())
834        .map_or(0, |slot| slot.saturating_add(1));
835    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
836
837    let Some(direct_projection_slots) = direct_projection_slots else {
838        return projected_slots;
839    };
840
841    for slot in direct_projection_slots.iter().copied() {
842        if let Some(projected) = projected_slots.get_mut(slot) {
843            *projected = true;
844        }
845    }
846
847    projected_slots
848}
849
850fn resolved_order_for_plan(
851    schema_info: &SchemaInfo,
852    plan: &AccessPlannedQuery,
853) -> Result<Option<ResolvedOrder>, InternalError> {
854    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
855        return Ok(None);
856    }
857
858    let Some(order) = plan.scalar_plan().order.as_ref() else {
859        return Ok(None);
860    };
861
862    let mut fields = Vec::with_capacity(order.fields.len());
863    for term in &order.fields {
864        fields.push(ResolvedOrderField::new(
865            resolved_order_value_source_for_term(schema_info, term)?,
866            term.direction(),
867        ));
868    }
869
870    Ok(Some(ResolvedOrder::new(fields)))
871}
872
873fn resolved_order_value_source_for_term(
874    schema_info: &SchemaInfo,
875    term: &crate::db::query::plan::OrderTerm,
876) -> Result<ResolvedOrderValueSource, InternalError> {
877    if term.direct_field().is_none() {
878        let rendered = term.rendered_label();
879        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
880        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
881            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
882
883        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
884            &compiled,
885        )));
886    }
887
888    let field = term.direct_field().expect("query semantics invariant");
889    let slot = resolve_required_schema_slot(
890        schema_info,
891        field,
892        InternalError::query_invalid_logical_plan,
893    )?;
894
895    Ok(ResolvedOrderValueSource::direct_field(slot))
896}
897
898fn validate_resolved_order_expr_fields(
899    schema_info: &SchemaInfo,
900    expr: &Expr,
901    rendered: &str,
902) -> Result<(), InternalError> {
903    expr.try_for_each_tree_expr(&mut |node| match node {
904        Expr::Field(field_id) => resolve_required_schema_slot(
905            schema_info,
906            field_id.as_str(),
907            InternalError::query_invalid_logical_plan,
908        )
909        .map(|_| ()),
910        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
911        #[cfg(test)]
912        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
913        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
914        _ => Ok(()),
915    })
916}
917
918// Resolve one schema-authoritative field slot while keeping planner
919// invalid-logical-plan error construction at the callsite that owns the
920// diagnostic wording.
921fn resolve_required_schema_slot<F>(
922    schema_info: &SchemaInfo,
923    field: &str,
924    invalid_plan_error: F,
925) -> Result<usize, InternalError>
926where
927    F: FnOnce() -> InternalError,
928{
929    schema_info
930        .field_slot_index(field)
931        .ok_or_else(invalid_plan_error)
932}
933
934// Keep the scalar-order expression seam violation text under one helper so the
935// parse validation and compile validation paths do not drift.
936fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
937    InternalError::query_invalid_logical_plan()
938}
939
940// Keep one stable executor-facing slot list for grouped order terms after the
941// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
942// now consumes this same referenced-slot contract instead of re-deriving order
943// sources from planner strategy at runtime.
944fn order_referenced_slots_for_resolved_order(
945    resolved_order: Option<&ResolvedOrder>,
946) -> Option<Vec<usize>> {
947    Some(resolved_order?.referenced_slots())
948}
949
950fn slot_map_for_schema_plan(
951    schema_info: &SchemaInfo,
952    plan: &AccessPlannedQuery,
953) -> Option<Vec<usize>> {
954    let executable = plan.access.executable_contract();
955
956    resolved_index_slots_for_access_path(schema_info, &executable)
957}
958
959fn resolved_index_slots_for_access_path(
960    schema_info: &SchemaInfo,
961    access: &ExecutableAccessPlan<'_, crate::value::Value>,
962) -> Option<Vec<usize>> {
963    let path = access.as_path()?;
964    let path_facts = path.shape_facts();
965    let key_items = path_facts.index_key_items_for_slot_map()?;
966    let mut slots = Vec::new();
967
968    match key_items.key_items() {
969        SemanticIndexKeyItemsRef::Fields(fields) => {
970            slots.reserve(fields.len());
971            for field_name in fields {
972                let slot = schema_info.field_slot_index(field_name)?;
973                slots.push(slot);
974            }
975        }
976        SemanticIndexKeyItemsRef::Accepted(items) => {
977            slots.reserve(items.len());
978            for key_item in items {
979                let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
980                slots.push(slot);
981            }
982        }
983        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
984            slots.reserve(fields.len());
985            for &field_name in fields {
986                let slot = schema_info.field_slot_index(field_name)?;
987                slots.push(slot);
988            }
989        }
990        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
991            slots.reserve(items.len());
992            for key_item in items {
993                let slot = schema_info.field_slot_index(key_item.field())?;
994                slots.push(slot);
995            }
996        }
997    }
998
999    Some(slots)
1000}
1001
1002fn index_compile_targets_for_schema_plan(
1003    schema_info: &SchemaInfo,
1004    plan: &AccessPlannedQuery,
1005) -> Option<Vec<IndexCompileTarget>> {
1006    let executable = plan.access.executable_contract();
1007    let path = executable.as_path()?;
1008    let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1009    let mut targets = Vec::new();
1010
1011    match key_items.key_items() {
1012        SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1013            return None;
1014        }
1015        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1016            for (component_index, &field_name) in fields.iter().enumerate() {
1017                let field_slot = schema_info.field_slot_index(field_name)?;
1018                targets.push(IndexCompileTarget {
1019                    component_index,
1020                    field_slot,
1021                    key_item: IndexKeyItem::Field(field_name),
1022                });
1023            }
1024        }
1025        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1026            for (component_index, &key_item) in items.iter().enumerate() {
1027                let field_slot = schema_info.field_slot_index(key_item.field())?;
1028                targets.push(IndexCompileTarget {
1029                    component_index,
1030                    field_slot,
1031                    key_item,
1032                });
1033            }
1034        }
1035    }
1036
1037    Some(targets)
1038}