Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9    db::{
10        access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11        predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12        query::plan::{
13            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
17            ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
18            derive_logical_pushdown_eligibility,
19            expr::{
20                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
21                compile_scalar_projection_plan_with_schema,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy,
25            lower_data_row_direct_projection_slots_with_schema,
26            lower_direct_projection_slots_with_schema, lower_projection_identity,
27            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
28            residual_query_predicate_after_filtered_access_contract,
29            resolved_grouped_distinct_execution_strategy_with_schema_info,
30        },
31        schema::SchemaInfo,
32    },
33    error::InternalError,
34    model::{
35        entity::EntityModel,
36        index::{IndexKeyItem, IndexKeyItemsRef},
37    },
38};
39
40impl QueryMode {
41    /// True if this mode represents a load intent.
42    #[must_use]
43    pub const fn is_load(&self) -> bool {
44        match self {
45            Self::Load(_) => true,
46            Self::Delete(_) => false,
47        }
48    }
49
50    /// True if this mode represents a delete intent.
51    #[must_use]
52    pub const fn is_delete(&self) -> bool {
53        match self {
54            Self::Delete(_) => true,
55            Self::Load(_) => false,
56        }
57    }
58}
59
60impl LogicalPlan {
61    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
62    #[must_use]
63    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
64        match self {
65            Self::Scalar(plan) => plan,
66            Self::Grouped(plan) => &plan.scalar,
67        }
68    }
69
70    /// Borrow scalar semantic fields mutably across logical variants for tests.
71    #[must_use]
72    #[cfg(test)]
73    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
74        match self {
75            Self::Scalar(plan) => plan,
76            Self::Grouped(plan) => &mut plan.scalar,
77        }
78    }
79
80    /// Test-only shorthand for explicit scalar semantic borrowing.
81    #[must_use]
82    #[cfg(test)]
83    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
84        self.scalar_semantics()
85    }
86
87    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
88    #[must_use]
89    #[cfg(test)]
90    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
91        self.scalar_semantics_mut()
92    }
93}
94
95impl AccessPlannedQuery {
96    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
97    #[must_use]
98    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
99        self.logical.scalar_semantics()
100    }
101
102    /// Borrow scalar missing-row consistency without exposing the full scalar
103    /// plan to executor owners that only need row-presence policy.
104    #[must_use]
105    #[cfg(any(test, feature = "sql"))]
106    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
107        self.scalar_plan().consistency
108    }
109
110    /// Borrow scalar semantic fields mutably across logical variants for tests.
111    #[must_use]
112    #[cfg(test)]
113    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
114        self.logical.scalar_semantics_mut()
115    }
116
117    /// Test-only shorthand for explicit scalar plan borrowing.
118    #[must_use]
119    #[cfg(test)]
120    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
121        self.scalar_plan()
122    }
123
124    /// Test-only shorthand for explicit mutable scalar plan borrowing.
125    #[must_use]
126    #[cfg(test)]
127    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
128        self.scalar_plan_mut()
129    }
130
131    /// Borrow grouped semantic fields when this plan is grouped.
132    #[must_use]
133    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
134        match &self.logical {
135            LogicalPlan::Scalar(_) => None,
136            LogicalPlan::Grouped(plan) => Some(plan),
137        }
138    }
139
140    /// Lower this plan into one canonical planner-owned projection semantic spec.
141    #[must_use]
142    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
143        if let Some(static_contract) = &self.static_execution_planning_contract {
144            return static_contract.projection_spec.clone();
145        }
146
147        lower_projection_intent(model, &self.logical, &self.projection_selection)
148    }
149
150    /// Lower this plan into one projection semantic shape for identity hashing.
151    #[must_use]
152    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
153        lower_projection_identity(&self.logical, &self.projection_selection)
154    }
155
156    /// Return the executor-facing predicate after removing only filtered-index
157    /// guard clauses the chosen access path already proves.
158    ///
159    /// This conservative form is used by preparation/explain surfaces that
160    /// still need to see access-bound equalities as index-predicate input.
161    #[must_use]
162    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
163        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
164            return static_contract.execution_preparation_predicate.clone();
165        }
166
167        derive_execution_preparation_predicate(self)
168    }
169
170    /// Return the executor-facing residual predicate after removing any
171    /// filtered-index guard clauses and fixed access-bound equalities already
172    /// guaranteed by the chosen path.
173    #[must_use]
174    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
175        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
176            return static_contract.residual_filter_predicate.clone();
177        }
178
179        derive_residual_filter_predicate(self)
180    }
181
182    /// Return whether one explicit residual predicate survives access
183    /// planning and still participates in residual execution.
184    #[must_use]
185    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
186        self.effective_execution_predicate().is_some()
187    }
188
189    /// Borrow the planner-owned residual scalar filter expression when one
190    /// surviving semantic remainder still requires runtime evaluation.
191    #[must_use]
192    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
193        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
194            return static_contract.residual_filter_expr.as_ref();
195        }
196
197        if !derive_has_residual_filter(self) {
198            return None;
199        }
200
201        self.scalar_plan().filter_expr.as_ref()
202    }
203
204    /// Return whether one explicit residual scalar filter expression survives
205    /// access planning and still requires runtime evaluation.
206    #[must_use]
207    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
208        self.residual_filter_expr().is_some()
209    }
210
211    /// Borrow the planner-compiled execution-preparation predicate program.
212    #[must_use]
213    pub(in crate::db) const fn execution_preparation_compiled_predicate(
214        &self,
215    ) -> Option<&PredicateProgram> {
216        self.static_execution_planning_contract()
217            .execution_preparation_compiled_predicate
218            .as_ref()
219    }
220
221    /// Borrow the planner-compiled effective runtime predicate program.
222    #[must_use]
223    pub(in crate::db) const fn effective_runtime_compiled_predicate(
224        &self,
225    ) -> Option<&PredicateProgram> {
226        match self
227            .static_execution_planning_contract()
228            .effective_runtime_filter_program
229            .as_ref()
230        {
231            Some(program) => program.predicate_program(),
232            None => None,
233        }
234    }
235
236    /// Borrow the planner-compiled effective runtime scalar filter expression.
237    #[cfg(test)]
238    #[must_use]
239    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
240        &self,
241    ) -> Option<&CompiledExpr> {
242        match self
243            .static_execution_planning_contract()
244            .effective_runtime_filter_program
245            .as_ref()
246        {
247            Some(program) => program.expression_filter(),
248            None => None,
249        }
250    }
251
252    /// Borrow the planner-frozen effective runtime scalar filter program.
253    #[must_use]
254    pub(in crate::db) const fn effective_runtime_filter_program(
255        &self,
256    ) -> Option<&EffectiveRuntimeFilterProgram> {
257        self.static_execution_planning_contract()
258            .effective_runtime_filter_program
259            .as_ref()
260    }
261
262    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
263    #[must_use]
264    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
265        if !self.scalar_plan().distinct {
266            return DistinctExecutionStrategy::None;
267        }
268
269        // DISTINCT on duplicate-safe single-path access shapes is a planner
270        // no-op for runtime dedup mechanics. Composite shapes can surface
271        // duplicate keys and therefore retain explicit dedup execution.
272        match distinct_runtime_dedup_strategy(&self.access) {
273            Some(strategy) => strategy,
274            None => DistinctExecutionStrategy::None,
275        }
276    }
277
278    /// Freeze one planner-owned route profile after model validation completes.
279    #[cfg(test)]
280    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
281        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
282    }
283
284    /// Freeze one planner-owned route profile from accepted schema authority.
285    pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
286        &mut self,
287        schema_info: &SchemaInfo,
288    ) {
289        self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
290    }
291
292    /// Freeze model-only executor metadata after logical/access planning completes.
293    #[cfg(test)]
294    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
295        &mut self,
296        model: &EntityModel,
297    ) -> Result<(), InternalError> {
298        self.finalize_static_execution_planning_contract_for_model_with_schema(
299            model,
300            SchemaInfo::cached_for_generated_entity_model(model),
301        )
302    }
303
304    /// Freeze planner-owned executor metadata with explicit schema authority.
305    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
306        &mut self,
307        model: &EntityModel,
308        schema_info: &SchemaInfo,
309    ) -> Result<(), InternalError> {
310        self.static_execution_planning_contract = Some(
311            project_static_execution_planning_contract_for_model(model, schema_info, self)?,
312        );
313
314        Ok(())
315    }
316
317    /// Build one immutable execution-shape signature contract for runtime layers.
318    #[must_use]
319    pub(in crate::db) fn execution_shape_signature(
320        &self,
321        entity_path: &'static str,
322    ) -> ExecutionShapeSignature {
323        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
324    }
325
326    /// Return whether the chosen access contract fully satisfies the current
327    /// scalar query predicate without any additional post-access filtering.
328    #[must_use]
329    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
330        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
331            return self.scalar_plan().predicate.is_some()
332                && static_contract.residual_filter_predicate.is_none()
333                && static_contract.residual_filter_expr.is_none();
334        }
335
336        derive_predicate_fully_satisfied_by_access_contract(self)
337    }
338
339    /// Borrow the planner-frozen compiled scalar projection program.
340    #[must_use]
341    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
342        self.static_execution_planning_contract()
343            .scalar_projection_plan
344            .as_deref()
345    }
346
347    /// Return whether planner-owned static execution metadata has already been frozen.
348    #[must_use]
349    pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
350        self.static_execution_planning_contract.is_some()
351    }
352
353    /// Borrow the planner-frozen ordered primary-key field names.
354    #[must_use]
355    pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
356        self.static_execution_planning_contract()
357            .primary_key_names
358            .iter()
359            .map(String::as_str)
360            .collect()
361    }
362
363    /// Borrow the planner-frozen projection slot reachability set.
364    #[must_use]
365    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
366        self.static_execution_planning_contract()
367            .projection_referenced_slots
368            .as_slice()
369    }
370
371    /// Borrow the planner-frozen mask for direct projected output slots.
372    #[must_use]
373    #[cfg(any(test, feature = "diagnostics"))]
374    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
375        self.static_execution_planning_contract()
376            .projected_slot_mask
377            .as_slice()
378    }
379
380    /// Return whether projection remains the full model-identity field list.
381    #[must_use]
382    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
383        self.static_execution_planning_contract()
384            .projection_is_model_identity
385    }
386
387    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
388    #[must_use]
389    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
390        self.static_execution_planning_contract()
391            .order_referenced_slots
392            .as_deref()
393    }
394
395    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
396    #[must_use]
397    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
398        self.static_execution_planning_contract()
399            .resolved_order
400            .as_ref()
401    }
402
403    /// Borrow the planner-frozen access slot map used by index predicate compilation.
404    #[must_use]
405    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
406        self.static_execution_planning_contract()
407            .slot_map
408            .as_deref()
409    }
410
411    /// Borrow grouped aggregate execution specs already resolved during static planning.
412    #[must_use]
413    pub(in crate::db) fn grouped_aggregate_execution_specs(
414        &self,
415    ) -> Option<&[GroupedAggregateExecutionSpec]> {
416        self.static_execution_planning_contract()
417            .grouped_aggregate_execution_specs
418            .as_deref()
419    }
420
421    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
422    #[must_use]
423    pub(in crate::db) const fn grouped_distinct_execution_strategy(
424        &self,
425    ) -> Option<&GroupedDistinctExecutionStrategy> {
426        self.static_execution_planning_contract()
427            .grouped_distinct_execution_strategy
428            .as_ref()
429    }
430
431    /// Borrow the frozen projection semantic shape without reopening model ownership.
432    #[must_use]
433    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
434        &self.static_execution_planning_contract().projection_spec
435    }
436
437    /// Borrow the frozen direct projection slots without reopening model ownership.
438    #[must_use]
439    #[cfg(any(test, feature = "sql"))]
440    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
441        self.static_execution_planning_contract()
442            .projection_direct_slots
443            .as_deref()
444    }
445
446    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
447    #[must_use]
448    #[cfg(any(test, feature = "sql"))]
449    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
450        self.static_execution_planning_contract()
451            .projection_data_row_direct_slots
452            .as_deref()
453    }
454
455    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
456    #[must_use]
457    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
458        self.static_execution_planning_contract()
459            .index_compile_targets
460            .as_deref()
461    }
462
463    const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
464        self.static_execution_planning_contract
465            .as_ref()
466            .expect("access-planned queries must freeze static execution planning contract before execution")
467    }
468}
469
470fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
471    match access {
472        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
473            Some(DistinctExecutionStrategy::PreOrdered)
474        }
475        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
476            Some(DistinctExecutionStrategy::HashMaterialize)
477        }
478        AccessPlan::Path(_) => None,
479    }
480}
481
482fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
483    let is_grouped_safe = plan
484        .grouped_plan()
485        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
486
487    ContinuationPolicy::new(
488        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
489        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
490        is_grouped_safe,
491    )
492}
493
494/// Project one planner-owned route profile from the finalized logical+access plan.
495#[must_use]
496#[cfg(test)]
497pub(in crate::db) fn project_planner_route_profile_for_model(
498    model: &EntityModel,
499    plan: &AccessPlannedQuery,
500) -> PlannerRouteProfile {
501    let primary_key_names = ordered_primary_key_names(model);
502    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
503        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
504    });
505
506    PlannerRouteProfile::new(
507        derive_continuation_policy_validated(plan),
508        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
509        secondary_order_contract,
510    )
511}
512
513/// Project one planner-owned route profile from accepted schema authority.
514#[must_use]
515pub(in crate::db) fn project_planner_route_profile_for_schema(
516    schema_info: &SchemaInfo,
517    plan: &AccessPlannedQuery,
518) -> PlannerRouteProfile {
519    let primary_key_names = primary_key_names_from_schema(schema_info);
520    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
521        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
522    });
523
524    PlannerRouteProfile::new(
525        derive_continuation_policy_validated(plan),
526        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
527        secondary_order_contract,
528    )
529}
530
531fn project_static_execution_planning_contract_for_model(
532    model: &EntityModel,
533    schema_info: &SchemaInfo,
534    plan: &AccessPlannedQuery,
535) -> Result<StaticExecutionPlanningContract, InternalError> {
536    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
537    let execution_preparation_predicate = plan.execution_preparation_predicate();
538    let residual_filter_predicate = derive_residual_filter_predicate(plan);
539    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
540    let execution_preparation_compiled_predicate =
541        compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
542    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
543        schema_info,
544        residual_filter_expr.as_ref(),
545        residual_filter_predicate.as_ref(),
546    )?;
547    let scalar_projection_plan = if plan.grouped_plan().is_none() {
548        Some(
549                compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
550                    .ok_or_else(|| {
551                        InternalError::query_executor_invariant(
552                            "scalar projection program must compile during static planning finalization",
553                        )
554                    })?
555                    .iter()
556                    .map(CompiledExpr::compile)
557                    .collect(),
558            )
559    } else {
560        None
561    };
562    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
563        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
564    let projection_direct_slots = lower_direct_projection_slots_with_schema(
565        model,
566        schema_info,
567        &plan.logical,
568        &plan.projection_selection,
569    );
570    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
571        model,
572        schema_info,
573        &plan.logical,
574        &plan.projection_selection,
575    );
576    let projection_referenced_slots =
577        projection_spec.referenced_slots_for_schema(model, schema_info)?;
578    let projected_slot_mask =
579        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
580    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
581    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
582    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
583    let slot_map = slot_map_for_schema_plan(schema_info, plan);
584    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
585
586    Ok(StaticExecutionPlanningContract {
587        primary_key_names: schema_info.primary_key_names().to_vec(),
588        projection_spec,
589        execution_preparation_predicate,
590        residual_filter_expr,
591        residual_filter_predicate,
592        execution_preparation_compiled_predicate,
593        effective_runtime_filter_program,
594        scalar_projection_plan,
595        grouped_aggregate_execution_specs,
596        grouped_distinct_execution_strategy,
597        projection_direct_slots,
598        projection_data_row_direct_slots,
599        projection_referenced_slots,
600        projected_slot_mask,
601        projection_is_model_identity,
602        resolved_order,
603        order_referenced_slots,
604        slot_map,
605        index_compile_targets,
606    })
607}
608
609#[cfg(test)]
610fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
611    model.primary_key_names()
612}
613
614fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
615    schema_info
616        .primary_key_names()
617        .iter()
618        .map(String::as_str)
619        .collect()
620}
621
622// Compile the executor-owned residual scalar filter contract once from the
623// planner-derived residual artifacts so runtime never has to rediscover
624// residual presence or shape from semantic/filter/pushdown state.
625fn compile_effective_runtime_filter_program(
626    schema_info: &SchemaInfo,
627    residual_filter_expr: Option<&Expr>,
628    residual_filter_predicate: Option<&Predicate>,
629) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
630    // Keep the existing predicate fast path when the residual semantics still
631    // fit the derived predicate contract. The expression-owned lane is only
632    // needed once pushdown loses semantic coverage and a residual predicate no
633    // longer exists.
634    if let Some(predicate) = residual_filter_predicate {
635        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
636            PredicateProgram::compile_with_schema_info(schema_info, predicate),
637        )));
638    }
639
640    if let Some(filter_expr) = residual_filter_expr {
641        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
642            .ok_or_else(|| {
643                InternalError::query_invalid_logical_plan(
644                    "effective runtime scalar filter expression must compile during static planning finalization",
645                )
646            })?;
647
648        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
649            CompiledExpr::compile(&compiled),
650        )));
651    }
652
653    Ok(None)
654}
655
656// Derive the executor-preparation predicate once from the selected access path.
657// This strips only filtered-index guard clauses while preserving access-bound
658// equalities that still matter to preparation/explain consumers.
659fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
660    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
661
662    match plan.access.selected_index_contract() {
663        Some(index) => {
664            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
665        }
666        None => Some(query_predicate.clone()),
667    }
668}
669
670// Derive the final residual predicate once from the already-filtered
671// preparation predicate plus any equality bounds guaranteed by the concrete
672// access path.
673fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
674    let filtered_residual = derive_execution_preparation_predicate(plan);
675    let filtered_residual = filtered_residual.as_ref()?;
676
677    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
678}
679
680// Derive the explicit residual semantic expression once for finalized plans.
681// The residual expression remains the planner-owned semantic filter when any
682// runtime filtering still survives access satisfaction.
683fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
684    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
685    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
686        return None;
687    }
688
689    Some(filter_expr.clone())
690}
691
692// Derive the explicit residual semantic expression during finalization using
693// the trusted entity schema so compare-family literal normalization matches the
694// planner-owned predicate contract before residual ownership is decided.
695fn derive_residual_filter_expr_for_model(
696    model: &EntityModel,
697    plan: &AccessPlannedQuery,
698) -> Option<Expr> {
699    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
700    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
701        return None;
702    }
703
704    Some(filter_expr.clone())
705}
706
707// Return whether any residual filtering survives after access planning. This
708// helper exists only for pre-finalization assembly; finalized plans must read
709// the explicit residual artifacts frozen in `StaticExecutionPlanningContract`.
710fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
711    match (
712        plan.scalar_plan().filter_expr.as_ref(),
713        plan.scalar_plan().predicate.as_ref(),
714    ) {
715        (None, None) => false,
716        (Some(_), None) => true,
717        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
718    }
719}
720
721// Return true when the planner-owned predicate contract is fully satisfied by
722// access planning and no semantic residual filter expression survives.
723fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
724    plan.scalar_plan().predicate.is_some()
725        && derive_residual_filter_predicate(plan).is_none()
726        && derive_residual_filter_expr(plan).is_none()
727}
728
729// Return true when the semantic filter expression is entirely represented by
730// the planner-owned predicate contract and the chosen access path satisfies
731// that predicate without any runtime remainder.
732const fn derive_semantic_filter_fully_satisfied_by_access_contract(
733    plan: &AccessPlannedQuery,
734) -> bool {
735    plan.scalar_plan().filter_expr.is_some()
736        && plan.scalar_plan().predicate.is_some()
737        && plan.scalar_plan().predicate_covers_filter_expr
738}
739
740// Return true when finalized planning can prove that the semantic filter
741// expression is completely represented by the planner-owned predicate contract
742// after aligning compare literals through the trusted entity schema.
743const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
744    _model: &EntityModel,
745    plan: &AccessPlannedQuery,
746) -> bool {
747    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
748}
749
750// Compile one optional planner-frozen predicate program while keeping the
751// static planning assembly path free of repeated `Option` mapping boilerplate.
752fn compile_optional_predicate(
753    schema_info: &SchemaInfo,
754    predicate: Option<&Predicate>,
755) -> Option<PredicateProgram> {
756    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
757}
758
759// Resolve the grouped-only static planning semantics bundle once so grouped
760// aggregate execution specs and grouped DISTINCT strategy stay derived under
761// one shared grouped-plan branch.
762fn resolve_grouped_static_planning_semantics(
763    schema_info: &SchemaInfo,
764    plan: &AccessPlannedQuery,
765    projection_spec: &ProjectionSpec,
766) -> Result<
767    (
768        Option<Vec<GroupedAggregateExecutionSpec>>,
769        Option<GroupedDistinctExecutionStrategy>,
770    ),
771    InternalError,
772> {
773    let Some(grouped) = plan.grouped_plan() else {
774        return Ok((None, None));
775    };
776
777    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
778        projection_spec,
779        grouped.group.group_fields.as_slice(),
780        grouped.group.aggregates.as_slice(),
781    )?;
782    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
783
784    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
785        schema_info,
786        aggregate_specs.as_slice(),
787    )?);
788    let grouped_distinct_execution_strategy = Some(
789        resolved_grouped_distinct_execution_strategy_with_schema_info(
790            schema_info,
791            grouped.group.group_fields.as_slice(),
792            grouped.group.aggregates.as_slice(),
793            grouped.having_expr.as_ref(),
794        )?,
795    );
796
797    Ok((
798        grouped_aggregate_execution_specs,
799        grouped_distinct_execution_strategy,
800    ))
801}
802
803fn extend_grouped_having_aggregate_specs(
804    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
805    grouped: &GroupPlan,
806) -> Result<(), InternalError> {
807    if let Some(having_expr) = grouped.having_expr.as_ref() {
808        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
809    }
810
811    Ok(())
812}
813
814fn collect_grouped_having_expr_aggregate_specs(
815    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
816    expr: &Expr,
817) -> Result<(), InternalError> {
818    if !expr.contains_aggregate() {
819        return Ok(());
820    }
821
822    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
823        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
824
825        if aggregate_specs
826            .iter()
827            .all(|current| current != &aggregate_spec)
828        {
829            aggregate_specs.push(aggregate_spec);
830        }
831
832        Ok(())
833    })
834}
835
836fn projected_slot_mask_for_spec(
837    model: &EntityModel,
838    direct_projection_slots: Option<&[usize]>,
839) -> Vec<bool> {
840    let schema_slot_len = direct_projection_slots
841        .and_then(|slots| slots.iter().copied().max())
842        .map_or(0, |slot| slot.saturating_add(1));
843    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
844
845    let Some(direct_projection_slots) = direct_projection_slots else {
846        return projected_slots;
847    };
848
849    for slot in direct_projection_slots.iter().copied() {
850        if let Some(projected) = projected_slots.get_mut(slot) {
851            *projected = true;
852        }
853    }
854
855    projected_slots
856}
857
858fn resolved_order_for_plan(
859    schema_info: &SchemaInfo,
860    plan: &AccessPlannedQuery,
861) -> Result<Option<ResolvedOrder>, InternalError> {
862    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
863        return Ok(None);
864    }
865
866    let Some(order) = plan.scalar_plan().order.as_ref() else {
867        return Ok(None);
868    };
869
870    let mut fields = Vec::with_capacity(order.fields.len());
871    for term in &order.fields {
872        fields.push(ResolvedOrderField::new(
873            resolved_order_value_source_for_term(schema_info, term)?,
874            term.direction(),
875        ));
876    }
877
878    Ok(Some(ResolvedOrder::new(fields)))
879}
880
881fn resolved_order_value_source_for_term(
882    schema_info: &SchemaInfo,
883    term: &crate::db::query::plan::OrderTerm,
884) -> Result<ResolvedOrderValueSource, InternalError> {
885    if term.direct_field().is_none() {
886        let rendered = term.rendered_label();
887        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
888        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
889            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
890
891        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
892            &compiled,
893        )));
894    }
895
896    let field = term
897        .direct_field()
898        .expect("direct-field order branch should only execute for field-backed terms");
899    let slot = resolve_required_schema_slot(schema_info, field, || {
900        InternalError::query_invalid_logical_plan(format!(
901            "order expression references unknown field '{field}'",
902        ))
903    })?;
904
905    Ok(ResolvedOrderValueSource::direct_field(slot))
906}
907
908fn validate_resolved_order_expr_fields(
909    schema_info: &SchemaInfo,
910    expr: &Expr,
911    rendered: &str,
912) -> Result<(), InternalError> {
913    expr.try_for_each_tree_expr(&mut |node| match node {
914        Expr::Field(field_id) => {
915            resolve_required_schema_slot(schema_info, field_id.as_str(), || {
916                InternalError::query_invalid_logical_plan(format!(
917                    "order expression references unknown field '{rendered}'",
918                ))
919            })
920            .map(|_| ())
921        }
922        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
923        #[cfg(test)]
924        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
925        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
926        _ => Ok(()),
927    })
928}
929
930// Resolve one schema-authoritative field slot while keeping planner
931// invalid-logical-plan error construction at the callsite that owns the
932// diagnostic wording.
933fn resolve_required_schema_slot<F>(
934    schema_info: &SchemaInfo,
935    field: &str,
936    invalid_plan_error: F,
937) -> Result<usize, InternalError>
938where
939    F: FnOnce() -> InternalError,
940{
941    schema_info
942        .field_slot_index(field)
943        .ok_or_else(invalid_plan_error)
944}
945
946// Keep the scalar-order expression seam violation text under one helper so the
947// parse validation and compile validation paths do not drift.
948fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
949    InternalError::query_invalid_logical_plan(format!(
950        "order expression '{rendered}' did not stay on the scalar expression seam",
951    ))
952}
953
954// Keep one stable executor-facing slot list for grouped order terms after the
955// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
956// now consumes this same referenced-slot contract instead of re-deriving order
957// sources from planner strategy at runtime.
958fn order_referenced_slots_for_resolved_order(
959    resolved_order: Option<&ResolvedOrder>,
960) -> Option<Vec<usize>> {
961    Some(resolved_order?.referenced_slots())
962}
963
964fn slot_map_for_schema_plan(
965    schema_info: &SchemaInfo,
966    plan: &AccessPlannedQuery,
967) -> Option<Vec<usize>> {
968    let executable = plan.access.executable_contract();
969
970    resolved_index_slots_for_access_path(schema_info, &executable)
971}
972
973fn resolved_index_slots_for_access_path(
974    schema_info: &SchemaInfo,
975    access: &ExecutableAccessPlan<'_, crate::value::Value>,
976) -> Option<Vec<usize>> {
977    let path = access.as_path()?;
978    let path_facts = path.shape_facts();
979    let key_items = path_facts.index_key_items_for_slot_map()?;
980    let mut slots = Vec::new();
981
982    match key_items.key_items() {
983        SemanticIndexKeyItemsRef::Fields(fields) => {
984            slots.reserve(fields.len());
985            for field_name in fields {
986                let slot = schema_info.field_slot_index(field_name)?;
987                slots.push(slot);
988            }
989        }
990        SemanticIndexKeyItemsRef::Accepted(items) => {
991            slots.reserve(items.len());
992            for key_item in items {
993                let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
994                slots.push(slot);
995            }
996        }
997        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
998            slots.reserve(fields.len());
999            for &field_name in fields {
1000                let slot = schema_info.field_slot_index(field_name)?;
1001                slots.push(slot);
1002            }
1003        }
1004        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1005            slots.reserve(items.len());
1006            for key_item in items {
1007                let slot = schema_info.field_slot_index(key_item.field())?;
1008                slots.push(slot);
1009            }
1010        }
1011    }
1012
1013    Some(slots)
1014}
1015
1016fn index_compile_targets_for_schema_plan(
1017    schema_info: &SchemaInfo,
1018    plan: &AccessPlannedQuery,
1019) -> Option<Vec<IndexCompileTarget>> {
1020    let executable = plan.access.executable_contract();
1021    let path = executable.as_path()?;
1022    let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1023    let mut targets = Vec::new();
1024
1025    match key_items.key_items() {
1026        SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1027            return None;
1028        }
1029        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1030            for (component_index, &field_name) in fields.iter().enumerate() {
1031                let field_slot = schema_info.field_slot_index(field_name)?;
1032                targets.push(IndexCompileTarget {
1033                    component_index,
1034                    field_slot,
1035                    key_item: IndexKeyItem::Field(field_name),
1036                });
1037            }
1038        }
1039        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1040            for (component_index, &key_item) in items.iter().enumerate() {
1041                let field_slot = schema_info.field_slot_index(key_item.field())?;
1042                targets.push(IndexCompileTarget {
1043                    component_index,
1044                    field_slot,
1045                    key_item,
1046                });
1047            }
1048        }
1049    }
1050
1051    Some(targets)
1052}