Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
9        predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10        query::plan::{
11            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15            ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
16            derive_logical_pushdown_eligibility,
17            expr::{
18                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
19                compile_scalar_projection_plan_with_schema,
20            },
21            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22            grouped_cursor_policy_violation, grouped_plan_strategy,
23            lower_data_row_direct_projection_slots_with_schema,
24            lower_direct_projection_slots_with_schema, lower_projection_identity,
25            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
26            residual_query_predicate_after_filtered_access_contract,
27            resolved_grouped_distinct_execution_strategy_with_schema_info,
28        },
29        schema::SchemaInfo,
30    },
31    error::InternalError,
32    model::{
33        entity::EntityModel,
34        index::{IndexKeyItem, IndexKeyItemsRef},
35    },
36};
37
38impl QueryMode {
39    /// True if this mode represents a load intent.
40    #[must_use]
41    pub const fn is_load(&self) -> bool {
42        match self {
43            Self::Load(_) => true,
44            Self::Delete(_) => false,
45        }
46    }
47
48    /// True if this mode represents a delete intent.
49    #[must_use]
50    pub const fn is_delete(&self) -> bool {
51        match self {
52            Self::Delete(_) => true,
53            Self::Load(_) => false,
54        }
55    }
56}
57
58impl LogicalPlan {
59    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
60    #[must_use]
61    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
62        match self {
63            Self::Scalar(plan) => plan,
64            Self::Grouped(plan) => &plan.scalar,
65        }
66    }
67
68    /// Borrow scalar semantic fields mutably across logical variants for tests.
69    #[must_use]
70    #[cfg(test)]
71    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
72        match self {
73            Self::Scalar(plan) => plan,
74            Self::Grouped(plan) => &mut plan.scalar,
75        }
76    }
77
78    /// Test-only shorthand for explicit scalar semantic borrowing.
79    #[must_use]
80    #[cfg(test)]
81    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
82        self.scalar_semantics()
83    }
84
85    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
86    #[must_use]
87    #[cfg(test)]
88    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
89        self.scalar_semantics_mut()
90    }
91}
92
93impl AccessPlannedQuery {
94    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
95    #[must_use]
96    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
97        self.logical.scalar_semantics()
98    }
99
100    /// Borrow scalar missing-row consistency without exposing the full scalar
101    /// plan to executor owners that only need row-presence policy.
102    #[must_use]
103    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
104        self.scalar_plan().consistency
105    }
106
107    /// Borrow scalar semantic fields mutably across logical variants for tests.
108    #[must_use]
109    #[cfg(test)]
110    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
111        self.logical.scalar_semantics_mut()
112    }
113
114    /// Test-only shorthand for explicit scalar plan borrowing.
115    #[must_use]
116    #[cfg(test)]
117    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
118        self.scalar_plan()
119    }
120
121    /// Test-only shorthand for explicit mutable scalar plan borrowing.
122    #[must_use]
123    #[cfg(test)]
124    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
125        self.scalar_plan_mut()
126    }
127
128    /// Borrow grouped semantic fields when this plan is grouped.
129    #[must_use]
130    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
131        match &self.logical {
132            LogicalPlan::Scalar(_) => None,
133            LogicalPlan::Grouped(plan) => Some(plan),
134        }
135    }
136
137    /// Lower this plan into one canonical planner-owned projection semantic spec.
138    #[must_use]
139    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
140        if let Some(static_contract) = &self.static_execution_planning_contract {
141            return static_contract.projection_spec.clone();
142        }
143
144        lower_projection_intent(model, &self.logical, &self.projection_selection)
145    }
146
147    /// Lower this plan into one projection semantic shape for identity hashing.
148    #[must_use]
149    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
150        lower_projection_identity(&self.logical, &self.projection_selection)
151    }
152
153    /// Return the executor-facing predicate after removing only filtered-index
154    /// guard clauses the chosen access path already proves.
155    ///
156    /// This conservative form is used by preparation/explain surfaces that
157    /// still need to see access-bound equalities as index-predicate input.
158    #[must_use]
159    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
160        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
161            return static_contract.execution_preparation_predicate.clone();
162        }
163
164        derive_execution_preparation_predicate(self)
165    }
166
167    /// Return the executor-facing residual predicate after removing any
168    /// filtered-index guard clauses and fixed access-bound equalities already
169    /// guaranteed by the chosen path.
170    #[must_use]
171    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
172        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
173            return static_contract.residual_filter_predicate.clone();
174        }
175
176        derive_residual_filter_predicate(self)
177    }
178
179    /// Return whether one explicit residual predicate survives access
180    /// planning and still participates in residual execution.
181    #[must_use]
182    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
183        self.effective_execution_predicate().is_some()
184    }
185
186    /// Borrow the planner-owned residual scalar filter expression when one
187    /// surviving semantic remainder still requires runtime evaluation.
188    #[must_use]
189    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
190        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
191            return static_contract.residual_filter_expr.as_ref();
192        }
193
194        if !derive_has_residual_filter(self) {
195            return None;
196        }
197
198        self.scalar_plan().filter_expr.as_ref()
199    }
200
201    /// Return whether one explicit residual scalar filter expression survives
202    /// access planning and still requires runtime evaluation.
203    #[must_use]
204    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
205        self.residual_filter_expr().is_some()
206    }
207
208    /// Borrow the planner-compiled execution-preparation predicate program.
209    #[must_use]
210    pub(in crate::db) const fn execution_preparation_compiled_predicate(
211        &self,
212    ) -> Option<&PredicateProgram> {
213        self.static_execution_planning_contract()
214            .execution_preparation_compiled_predicate
215            .as_ref()
216    }
217
218    /// Borrow the planner-compiled effective runtime predicate program.
219    #[must_use]
220    pub(in crate::db) const fn effective_runtime_compiled_predicate(
221        &self,
222    ) -> Option<&PredicateProgram> {
223        match self
224            .static_execution_planning_contract()
225            .effective_runtime_filter_program
226            .as_ref()
227        {
228            Some(program) => program.predicate_program(),
229            None => None,
230        }
231    }
232
233    /// Borrow the planner-compiled effective runtime scalar filter expression.
234    #[cfg(test)]
235    #[must_use]
236    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
237        &self,
238    ) -> Option<&CompiledExpr> {
239        match self
240            .static_execution_planning_contract()
241            .effective_runtime_filter_program
242            .as_ref()
243        {
244            Some(program) => program.expression_filter(),
245            None => None,
246        }
247    }
248
249    /// Borrow the planner-frozen effective runtime scalar filter program.
250    #[must_use]
251    pub(in crate::db) const fn effective_runtime_filter_program(
252        &self,
253    ) -> Option<&EffectiveRuntimeFilterProgram> {
254        self.static_execution_planning_contract()
255            .effective_runtime_filter_program
256            .as_ref()
257    }
258
259    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
260    #[must_use]
261    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
262        if !self.scalar_plan().distinct {
263            return DistinctExecutionStrategy::None;
264        }
265
266        // DISTINCT on duplicate-safe single-path access shapes is a planner
267        // no-op for runtime dedup mechanics. Composite shapes can surface
268        // duplicate keys and therefore retain explicit dedup execution.
269        match distinct_runtime_dedup_strategy(&self.access) {
270            Some(strategy) => strategy,
271            None => DistinctExecutionStrategy::None,
272        }
273    }
274
275    /// Freeze one planner-owned route profile after model validation completes.
276    #[cfg(test)]
277    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
278        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
279    }
280
281    /// Freeze one planner-owned route profile from accepted schema authority.
282    pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
283        &mut self,
284        schema_info: &SchemaInfo,
285    ) {
286        self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
287    }
288
289    /// Freeze model-only executor metadata after logical/access planning completes.
290    #[cfg(test)]
291    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
292        &mut self,
293        model: &EntityModel,
294    ) -> Result<(), InternalError> {
295        self.finalize_static_execution_planning_contract_for_model_with_schema(
296            model,
297            SchemaInfo::cached_for_generated_entity_model(model),
298        )
299    }
300
301    /// Freeze planner-owned executor metadata with explicit schema authority.
302    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
303        &mut self,
304        model: &EntityModel,
305        schema_info: &SchemaInfo,
306    ) -> Result<(), InternalError> {
307        self.static_execution_planning_contract = Some(
308            project_static_execution_planning_contract_for_model(model, schema_info, self)?,
309        );
310
311        Ok(())
312    }
313
314    /// Build one immutable execution-shape signature contract for runtime layers.
315    #[must_use]
316    pub(in crate::db) fn execution_shape_signature(
317        &self,
318        entity_path: &'static str,
319    ) -> ExecutionShapeSignature {
320        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
321    }
322
323    /// Return whether the chosen access contract fully satisfies the current
324    /// scalar query predicate without any additional post-access filtering.
325    #[must_use]
326    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
327        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
328            return self.scalar_plan().predicate.is_some()
329                && static_contract.residual_filter_predicate.is_none()
330                && static_contract.residual_filter_expr.is_none();
331        }
332
333        derive_predicate_fully_satisfied_by_access_contract(self)
334    }
335
336    /// Borrow the planner-frozen compiled scalar projection program.
337    #[must_use]
338    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
339        self.static_execution_planning_contract()
340            .scalar_projection_plan
341            .as_deref()
342    }
343
344    /// Return whether planner-owned static execution metadata has already been frozen.
345    #[must_use]
346    pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
347        self.static_execution_planning_contract.is_some()
348    }
349
350    /// Borrow the planner-frozen ordered primary-key field names.
351    #[must_use]
352    pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
353        self.static_execution_planning_contract()
354            .primary_key_names
355            .iter()
356            .map(String::as_str)
357            .collect()
358    }
359
360    /// Borrow the planner-frozen projection slot reachability set.
361    #[must_use]
362    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
363        self.static_execution_planning_contract()
364            .projection_referenced_slots
365            .as_slice()
366    }
367
368    /// Borrow the planner-frozen mask for direct projected output slots.
369    #[must_use]
370    #[cfg(any(test, feature = "diagnostics"))]
371    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
372        self.static_execution_planning_contract()
373            .projected_slot_mask
374            .as_slice()
375    }
376
377    /// Return whether projection remains the full model-identity field list.
378    #[must_use]
379    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
380        self.static_execution_planning_contract()
381            .projection_is_model_identity
382    }
383
384    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
385    #[must_use]
386    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
387        self.static_execution_planning_contract()
388            .order_referenced_slots
389            .as_deref()
390    }
391
392    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
393    #[must_use]
394    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
395        self.static_execution_planning_contract()
396            .resolved_order
397            .as_ref()
398    }
399
400    /// Borrow the planner-frozen access slot map used by index predicate compilation.
401    #[must_use]
402    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
403        self.static_execution_planning_contract()
404            .slot_map
405            .as_deref()
406    }
407
408    /// Borrow grouped aggregate execution specs already resolved during static planning.
409    #[must_use]
410    pub(in crate::db) fn grouped_aggregate_execution_specs(
411        &self,
412    ) -> Option<&[GroupedAggregateExecutionSpec]> {
413        self.static_execution_planning_contract()
414            .grouped_aggregate_execution_specs
415            .as_deref()
416    }
417
418    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
419    #[must_use]
420    pub(in crate::db) const fn grouped_distinct_execution_strategy(
421        &self,
422    ) -> Option<&GroupedDistinctExecutionStrategy> {
423        self.static_execution_planning_contract()
424            .grouped_distinct_execution_strategy
425            .as_ref()
426    }
427
428    /// Borrow the frozen projection semantic shape without reopening model ownership.
429    #[must_use]
430    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
431        &self.static_execution_planning_contract().projection_spec
432    }
433
434    /// Borrow the frozen direct projection slots without reopening model ownership.
435    #[must_use]
436    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
437        self.static_execution_planning_contract()
438            .projection_direct_slots
439            .as_deref()
440    }
441
442    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
443    #[must_use]
444    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
445        self.static_execution_planning_contract()
446            .projection_data_row_direct_slots
447            .as_deref()
448    }
449
450    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
451    #[must_use]
452    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
453        self.static_execution_planning_contract()
454            .index_compile_targets
455            .as_deref()
456    }
457
458    const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
459        self.static_execution_planning_contract
460            .as_ref()
461            .expect("access-planned queries must freeze static execution planning contract before execution")
462    }
463}
464
465fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
466    match access {
467        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
468            Some(DistinctExecutionStrategy::PreOrdered)
469        }
470        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
471            Some(DistinctExecutionStrategy::HashMaterialize)
472        }
473        AccessPlan::Path(_) => None,
474    }
475}
476
477fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
478    let is_grouped_safe = plan
479        .grouped_plan()
480        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
481
482    ContinuationPolicy::new(
483        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
484        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
485        is_grouped_safe,
486    )
487}
488
489/// Project one planner-owned route profile from the finalized logical+access plan.
490#[must_use]
491#[cfg(test)]
492pub(in crate::db) fn project_planner_route_profile_for_model(
493    model: &EntityModel,
494    plan: &AccessPlannedQuery,
495) -> PlannerRouteProfile {
496    let primary_key_names = ordered_primary_key_names(model);
497    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
498        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
499    });
500
501    PlannerRouteProfile::new(
502        derive_continuation_policy_validated(plan),
503        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
504        secondary_order_contract,
505    )
506}
507
508/// Project one planner-owned route profile from accepted schema authority.
509#[must_use]
510pub(in crate::db) fn project_planner_route_profile_for_schema(
511    schema_info: &SchemaInfo,
512    plan: &AccessPlannedQuery,
513) -> PlannerRouteProfile {
514    let primary_key_names = primary_key_names_from_schema(schema_info);
515    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
516        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
517    });
518
519    PlannerRouteProfile::new(
520        derive_continuation_policy_validated(plan),
521        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
522        secondary_order_contract,
523    )
524}
525
526fn project_static_execution_planning_contract_for_model(
527    model: &EntityModel,
528    schema_info: &SchemaInfo,
529    plan: &AccessPlannedQuery,
530) -> Result<StaticExecutionPlanningContract, InternalError> {
531    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
532    let execution_preparation_predicate = plan.execution_preparation_predicate();
533    let residual_filter_predicate = derive_residual_filter_predicate(plan);
534    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
535    let execution_preparation_compiled_predicate =
536        compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
537    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
538        schema_info,
539        residual_filter_expr.as_ref(),
540        residual_filter_predicate.as_ref(),
541    )?;
542    let scalar_projection_plan = if plan.grouped_plan().is_none() {
543        Some(
544                compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
545                    .ok_or_else(|| {
546                        InternalError::query_executor_invariant(
547                            "scalar projection program must compile during static planning finalization",
548                        )
549                    })?
550                    .iter()
551                    .map(CompiledExpr::compile)
552                    .collect(),
553            )
554    } else {
555        None
556    };
557    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
558        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
559    let projection_direct_slots = lower_direct_projection_slots_with_schema(
560        model,
561        schema_info,
562        &plan.logical,
563        &plan.projection_selection,
564    );
565    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
566        model,
567        schema_info,
568        &plan.logical,
569        &plan.projection_selection,
570    );
571    let projection_referenced_slots =
572        projection_spec.referenced_slots_for_schema(model, schema_info)?;
573    let projected_slot_mask =
574        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
575    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
576    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
577    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
578    let slot_map = slot_map_for_schema_plan(schema_info, plan);
579    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
580
581    Ok(StaticExecutionPlanningContract {
582        primary_key_names: schema_info.primary_key_names().to_vec(),
583        projection_spec,
584        execution_preparation_predicate,
585        residual_filter_expr,
586        residual_filter_predicate,
587        execution_preparation_compiled_predicate,
588        effective_runtime_filter_program,
589        scalar_projection_plan,
590        grouped_aggregate_execution_specs,
591        grouped_distinct_execution_strategy,
592        projection_direct_slots,
593        projection_data_row_direct_slots,
594        projection_referenced_slots,
595        projected_slot_mask,
596        projection_is_model_identity,
597        resolved_order,
598        order_referenced_slots,
599        slot_map,
600        index_compile_targets,
601    })
602}
603
604#[cfg(test)]
605fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
606    model.primary_key_names()
607}
608
609fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
610    schema_info
611        .primary_key_names()
612        .iter()
613        .map(String::as_str)
614        .collect()
615}
616
617// Compile the executor-owned residual scalar filter contract once from the
618// planner-derived residual artifacts so runtime never has to rediscover
619// residual presence or shape from semantic/filter/pushdown state.
620fn compile_effective_runtime_filter_program(
621    schema_info: &SchemaInfo,
622    residual_filter_expr: Option<&Expr>,
623    residual_filter_predicate: Option<&Predicate>,
624) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
625    // Keep the existing predicate fast path when the residual semantics still
626    // fit the derived predicate contract. The expression-owned lane is only
627    // needed once pushdown loses semantic coverage and a residual predicate no
628    // longer exists.
629    if let Some(predicate) = residual_filter_predicate {
630        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
631            PredicateProgram::compile_with_schema_info(schema_info, predicate),
632        )));
633    }
634
635    if let Some(filter_expr) = residual_filter_expr {
636        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
637            .ok_or_else(|| {
638                InternalError::query_invalid_logical_plan(
639                    "effective runtime scalar filter expression must compile during static planning finalization",
640                )
641            })?;
642
643        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
644            CompiledExpr::compile(&compiled),
645        )));
646    }
647
648    Ok(None)
649}
650
651// Derive the executor-preparation predicate once from the selected access path.
652// This strips only filtered-index guard clauses while preserving access-bound
653// equalities that still matter to preparation/explain consumers.
654fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
655    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
656
657    match plan.access.selected_index_contract() {
658        Some(index) => {
659            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
660        }
661        None => Some(query_predicate.clone()),
662    }
663}
664
665// Derive the final residual predicate once from the already-filtered
666// preparation predicate plus any equality bounds guaranteed by the concrete
667// access path.
668fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
669    let filtered_residual = derive_execution_preparation_predicate(plan);
670    let filtered_residual = filtered_residual.as_ref()?;
671
672    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
673}
674
675// Derive the explicit residual semantic expression once for finalized plans.
676// The residual expression remains the planner-owned semantic filter when any
677// runtime filtering still survives access satisfaction.
678fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
679    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
680    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
681        return None;
682    }
683
684    Some(filter_expr.clone())
685}
686
687// Derive the explicit residual semantic expression during finalization using
688// the trusted entity schema so compare-family literal normalization matches the
689// planner-owned predicate contract before residual ownership is decided.
690fn derive_residual_filter_expr_for_model(
691    model: &EntityModel,
692    plan: &AccessPlannedQuery,
693) -> Option<Expr> {
694    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
695    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
696        return None;
697    }
698
699    Some(filter_expr.clone())
700}
701
702// Return whether any residual filtering survives after access planning. This
703// helper exists only for pre-finalization assembly; finalized plans must read
704// the explicit residual artifacts frozen in `StaticExecutionPlanningContract`.
705fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
706    match (
707        plan.scalar_plan().filter_expr.as_ref(),
708        plan.scalar_plan().predicate.as_ref(),
709    ) {
710        (None, None) => false,
711        (Some(_), None) => true,
712        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
713    }
714}
715
716// Return true when the planner-owned predicate contract is fully satisfied by
717// access planning and no semantic residual filter expression survives.
718fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
719    plan.scalar_plan().predicate.is_some()
720        && derive_residual_filter_predicate(plan).is_none()
721        && derive_residual_filter_expr(plan).is_none()
722}
723
724// Return true when the semantic filter expression is entirely represented by
725// the planner-owned predicate contract and the chosen access path satisfies
726// that predicate without any runtime remainder.
727const fn derive_semantic_filter_fully_satisfied_by_access_contract(
728    plan: &AccessPlannedQuery,
729) -> bool {
730    plan.scalar_plan().filter_expr.is_some()
731        && plan.scalar_plan().predicate.is_some()
732        && plan.scalar_plan().predicate_covers_filter_expr
733}
734
735// Return true when finalized planning can prove that the semantic filter
736// expression is completely represented by the planner-owned predicate contract
737// after aligning compare literals through the trusted entity schema.
738const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
739    _model: &EntityModel,
740    plan: &AccessPlannedQuery,
741) -> bool {
742    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
743}
744
745// Compile one optional planner-frozen predicate program while keeping the
746// static planning assembly path free of repeated `Option` mapping boilerplate.
747fn compile_optional_predicate(
748    schema_info: &SchemaInfo,
749    predicate: Option<&Predicate>,
750) -> Option<PredicateProgram> {
751    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
752}
753
754// Resolve the grouped-only static planning semantics bundle once so grouped
755// aggregate execution specs and grouped DISTINCT strategy stay derived under
756// one shared grouped-plan branch.
757fn resolve_grouped_static_planning_semantics(
758    schema_info: &SchemaInfo,
759    plan: &AccessPlannedQuery,
760    projection_spec: &ProjectionSpec,
761) -> Result<
762    (
763        Option<Vec<GroupedAggregateExecutionSpec>>,
764        Option<GroupedDistinctExecutionStrategy>,
765    ),
766    InternalError,
767> {
768    let Some(grouped) = plan.grouped_plan() else {
769        return Ok((None, None));
770    };
771
772    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
773        projection_spec,
774        grouped.group.group_fields.as_slice(),
775        grouped.group.aggregates.as_slice(),
776    )?;
777    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
778
779    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
780        schema_info,
781        aggregate_specs.as_slice(),
782    )?);
783    let grouped_distinct_execution_strategy = Some(
784        resolved_grouped_distinct_execution_strategy_with_schema_info(
785            schema_info,
786            grouped.group.group_fields.as_slice(),
787            grouped.group.aggregates.as_slice(),
788            grouped.having_expr.as_ref(),
789        )?,
790    );
791
792    Ok((
793        grouped_aggregate_execution_specs,
794        grouped_distinct_execution_strategy,
795    ))
796}
797
798fn extend_grouped_having_aggregate_specs(
799    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
800    grouped: &GroupPlan,
801) -> Result<(), InternalError> {
802    if let Some(having_expr) = grouped.having_expr.as_ref() {
803        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
804    }
805
806    Ok(())
807}
808
809fn collect_grouped_having_expr_aggregate_specs(
810    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
811    expr: &Expr,
812) -> Result<(), InternalError> {
813    if !expr.contains_aggregate() {
814        return Ok(());
815    }
816
817    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
818        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
819
820        if aggregate_specs
821            .iter()
822            .all(|current| current != &aggregate_spec)
823        {
824            aggregate_specs.push(aggregate_spec);
825        }
826
827        Ok(())
828    })
829}
830
831fn projected_slot_mask_for_spec(
832    model: &EntityModel,
833    direct_projection_slots: Option<&[usize]>,
834) -> Vec<bool> {
835    let schema_slot_len = direct_projection_slots
836        .and_then(|slots| slots.iter().copied().max())
837        .map_or(0, |slot| slot.saturating_add(1));
838    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
839
840    let Some(direct_projection_slots) = direct_projection_slots else {
841        return projected_slots;
842    };
843
844    for slot in direct_projection_slots.iter().copied() {
845        if let Some(projected) = projected_slots.get_mut(slot) {
846            *projected = true;
847        }
848    }
849
850    projected_slots
851}
852
853fn resolved_order_for_plan(
854    schema_info: &SchemaInfo,
855    plan: &AccessPlannedQuery,
856) -> Result<Option<ResolvedOrder>, InternalError> {
857    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
858        return Ok(None);
859    }
860
861    let Some(order) = plan.scalar_plan().order.as_ref() else {
862        return Ok(None);
863    };
864
865    let mut fields = Vec::with_capacity(order.fields.len());
866    for term in &order.fields {
867        fields.push(ResolvedOrderField::new(
868            resolved_order_value_source_for_term(schema_info, term)?,
869            term.direction(),
870        ));
871    }
872
873    Ok(Some(ResolvedOrder::new(fields)))
874}
875
876fn resolved_order_value_source_for_term(
877    schema_info: &SchemaInfo,
878    term: &crate::db::query::plan::OrderTerm,
879) -> Result<ResolvedOrderValueSource, InternalError> {
880    if term.direct_field().is_none() {
881        let rendered = term.rendered_label();
882        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
883        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
884            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
885
886        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
887            &compiled,
888        )));
889    }
890
891    let field = term
892        .direct_field()
893        .expect("direct-field order branch should only execute for field-backed terms");
894    let slot = resolve_required_schema_slot(schema_info, field, || {
895        InternalError::query_invalid_logical_plan(format!(
896            "order expression references unknown field '{field}'",
897        ))
898    })?;
899
900    Ok(ResolvedOrderValueSource::direct_field(slot))
901}
902
903fn validate_resolved_order_expr_fields(
904    schema_info: &SchemaInfo,
905    expr: &Expr,
906    rendered: &str,
907) -> Result<(), InternalError> {
908    expr.try_for_each_tree_expr(&mut |node| match node {
909        Expr::Field(field_id) => {
910            resolve_required_schema_slot(schema_info, field_id.as_str(), || {
911                InternalError::query_invalid_logical_plan(format!(
912                    "order expression references unknown field '{rendered}'",
913                ))
914            })
915            .map(|_| ())
916        }
917        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
918        #[cfg(test)]
919        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
920        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
921        _ => Ok(()),
922    })
923}
924
925// Resolve one schema-authoritative field slot while keeping planner
926// invalid-logical-plan error construction at the callsite that owns the
927// diagnostic wording.
928fn resolve_required_schema_slot<F>(
929    schema_info: &SchemaInfo,
930    field: &str,
931    invalid_plan_error: F,
932) -> Result<usize, InternalError>
933where
934    F: FnOnce() -> InternalError,
935{
936    schema_info
937        .field_slot_index(field)
938        .ok_or_else(invalid_plan_error)
939}
940
941// Keep the scalar-order expression seam violation text under one helper so the
942// parse validation and compile validation paths do not drift.
943fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
944    InternalError::query_invalid_logical_plan(format!(
945        "order expression '{rendered}' did not stay on the scalar expression seam",
946    ))
947}
948
949// Keep one stable executor-facing slot list for grouped order terms after the
950// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
951// now consumes this same referenced-slot contract instead of re-deriving order
952// sources from planner strategy at runtime.
953fn order_referenced_slots_for_resolved_order(
954    resolved_order: Option<&ResolvedOrder>,
955) -> Option<Vec<usize>> {
956    Some(resolved_order?.referenced_slots())
957}
958
959fn slot_map_for_schema_plan(
960    schema_info: &SchemaInfo,
961    plan: &AccessPlannedQuery,
962) -> Option<Vec<usize>> {
963    let executable = plan.access.executable_contract();
964
965    resolved_index_slots_for_access_path(schema_info, &executable)
966}
967
968fn resolved_index_slots_for_access_path(
969    schema_info: &SchemaInfo,
970    access: &ExecutableAccessPlan<'_, crate::value::Value>,
971) -> Option<Vec<usize>> {
972    let path = access.as_path()?;
973    let path_facts = path.shape_facts();
974    let key_items = path_facts.index_key_items_for_slot_map()?;
975    let mut slots = Vec::new();
976
977    match key_items.key_items() {
978        SemanticIndexKeyItemsRef::Fields(fields) => {
979            slots.reserve(fields.len());
980            for field_name in fields {
981                let slot = schema_info.field_slot_index(field_name)?;
982                slots.push(slot);
983            }
984        }
985        SemanticIndexKeyItemsRef::Accepted(items) => {
986            slots.reserve(items.len());
987            for key_item in items {
988                let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
989                slots.push(slot);
990            }
991        }
992        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
993            slots.reserve(fields.len());
994            for &field_name in fields {
995                let slot = schema_info.field_slot_index(field_name)?;
996                slots.push(slot);
997            }
998        }
999        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1000            slots.reserve(items.len());
1001            for key_item in items {
1002                let slot = schema_info.field_slot_index(key_item.field())?;
1003                slots.push(slot);
1004            }
1005        }
1006    }
1007
1008    Some(slots)
1009}
1010
1011fn index_compile_targets_for_schema_plan(
1012    schema_info: &SchemaInfo,
1013    plan: &AccessPlannedQuery,
1014) -> Option<Vec<IndexCompileTarget>> {
1015    let executable = plan.access.executable_contract();
1016    let path = executable.as_path()?;
1017    let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1018    let mut targets = Vec::new();
1019
1020    match key_items.key_items() {
1021        SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1022            return None;
1023        }
1024        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1025            for (component_index, &field_name) in fields.iter().enumerate() {
1026                let field_slot = schema_info.field_slot_index(field_name)?;
1027                targets.push(IndexCompileTarget {
1028                    component_index,
1029                    field_slot,
1030                    key_item: IndexKeyItem::Field(field_name),
1031                });
1032            }
1033        }
1034        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1035            for (component_index, &key_item) in items.iter().enumerate() {
1036                let field_slot = schema_info.field_slot_index(key_item.field())?;
1037                targets.push(IndexCompileTarget {
1038                    component_index,
1039                    field_slot,
1040                    key_item,
1041                });
1042            }
1043        }
1044    }
1045
1046    Some(targets)
1047}