Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9    db::{
10        access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11        predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12        query::plan::{
13            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16            LogicalPlan, PlannerRouteProfile, QueryMode, ResidualFilterContract,
17            ResidualFilterShape, ResolvedOrder, ResolvedOrderField, ResolvedOrderValueSource,
18            ScalarPlan, StaticExecutionPlanningContract, derive_logical_pushdown_eligibility,
19            expr::{
20                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
21                compile_scalar_projection_plan_with_schema,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy,
25            lower_data_row_direct_projection_slots_with_schema,
26            lower_direct_projection_slots_with_schema, lower_projection_identity,
27            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
28            residual_query_predicate_after_filtered_access_contract,
29            resolved_grouped_distinct_execution_strategy_with_schema_info,
30        },
31        schema::SchemaInfo,
32    },
33    error::InternalError,
34    model::{
35        entity::EntityModel,
36        index::{IndexKeyItem, IndexKeyItemsRef},
37    },
38};
39
40impl QueryMode {
41    /// True if this mode represents a load intent.
42    #[must_use]
43    pub const fn is_load(&self) -> bool {
44        match self {
45            Self::Load(_) => true,
46            Self::Delete(_) => false,
47        }
48    }
49
50    /// True if this mode represents a delete intent.
51    #[must_use]
52    pub const fn is_delete(&self) -> bool {
53        match self {
54            Self::Delete(_) => true,
55            Self::Load(_) => false,
56        }
57    }
58}
59
60impl LogicalPlan {
61    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
62    #[must_use]
63    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
64        match self {
65            Self::Scalar(plan) => plan,
66            Self::Grouped(plan) => &plan.scalar,
67        }
68    }
69
70    /// Borrow scalar semantic fields mutably across logical variants for tests.
71    #[must_use]
72    #[cfg(test)]
73    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
74        match self {
75            Self::Scalar(plan) => plan,
76            Self::Grouped(plan) => &mut plan.scalar,
77        }
78    }
79
80    /// Test-only shorthand for explicit scalar semantic borrowing.
81    #[must_use]
82    #[cfg(test)]
83    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
84        self.scalar_semantics()
85    }
86
87    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
88    #[must_use]
89    #[cfg(test)]
90    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
91        self.scalar_semantics_mut()
92    }
93}
94
95impl AccessPlannedQuery {
96    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
97    #[must_use]
98    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
99        self.logical.scalar_semantics()
100    }
101
102    /// Borrow scalar missing-row consistency without exposing the full scalar
103    /// plan to executor owners that only need row-presence policy.
104    #[must_use]
105    #[cfg(any(test, feature = "sql"))]
106    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
107        self.scalar_plan().consistency
108    }
109
110    /// Borrow scalar semantic fields mutably across logical variants for tests.
111    #[must_use]
112    #[cfg(test)]
113    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
114        self.logical.scalar_semantics_mut()
115    }
116
117    /// Test-only shorthand for explicit scalar plan borrowing.
118    #[must_use]
119    #[cfg(test)]
120    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
121        self.scalar_plan()
122    }
123
124    /// Test-only shorthand for explicit mutable scalar plan borrowing.
125    #[must_use]
126    #[cfg(test)]
127    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
128        self.scalar_plan_mut()
129    }
130
131    /// Borrow grouped semantic fields when this plan is grouped.
132    #[must_use]
133    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
134        match &self.logical {
135            LogicalPlan::Scalar(_) => None,
136            LogicalPlan::Grouped(plan) => Some(plan),
137        }
138    }
139
140    /// Lower this plan into one canonical planner-owned projection semantic spec.
141    #[must_use]
142    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
143        if let Some(static_contract) = &self.static_execution_planning_contract {
144            return static_contract.projection_spec.clone();
145        }
146
147        lower_projection_intent(model, &self.logical, &self.projection_selection)
148    }
149
150    /// Lower this plan into one projection semantic shape for identity hashing.
151    #[must_use]
152    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
153        lower_projection_identity(&self.logical, &self.projection_selection)
154    }
155
156    /// Return the executor-facing predicate after removing only filtered-index
157    /// guard clauses the chosen access path already proves.
158    ///
159    /// This conservative form is used by preparation/explain surfaces that
160    /// still need to see access-bound equalities as index-predicate input.
161    #[must_use]
162    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
163        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
164            return static_contract.execution_preparation_predicate.clone();
165        }
166
167        derive_execution_preparation_predicate(self)
168    }
169
170    /// Return the executor-facing residual predicate after removing any
171    /// filtered-index guard clauses and fixed access-bound equalities already
172    /// guaranteed by the chosen path.
173    #[must_use]
174    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
175        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
176            return static_contract
177                .residual_filter_contract
178                .residual_filter_predicate()
179                .cloned();
180        }
181
182        derive_residual_filter_predicate(self)
183    }
184
185    /// Return whether one explicit residual predicate survives access
186    /// planning and still participates in residual execution.
187    #[must_use]
188    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
189        self.effective_execution_predicate().is_some()
190    }
191
192    /// Borrow the planner-owned residual scalar filter expression when one
193    /// surviving semantic remainder still requires runtime evaluation.
194    #[must_use]
195    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
196        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
197            return static_contract
198                .residual_filter_contract
199                .residual_filter_expr();
200        }
201
202        if !derive_has_residual_filter(self) {
203            return None;
204        }
205
206        self.scalar_plan().filter_expr.as_ref()
207    }
208
209    /// Return whether one explicit residual scalar filter expression survives
210    /// access planning and still requires runtime evaluation.
211    #[must_use]
212    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
213        self.residual_filter_expr().is_some()
214    }
215
216    /// Return the planner-owned residual-filter shape used by diagnostics.
217    #[must_use]
218    pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
219        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
220            return static_contract.residual_filter_contract.shape();
221        }
222
223        ResidualFilterShape::from_presence(
224            self.residual_filter_expr().is_some(),
225            self.effective_execution_predicate().is_some(),
226        )
227    }
228
229    /// Borrow the planner-compiled execution-preparation predicate program.
230    #[must_use]
231    pub(in crate::db) const fn execution_preparation_compiled_predicate(
232        &self,
233    ) -> Option<&PredicateProgram> {
234        self.static_execution_planning_contract()
235            .execution_preparation_compiled_predicate
236            .as_ref()
237    }
238
239    /// Borrow the planner-compiled effective runtime predicate program.
240    #[must_use]
241    pub(in crate::db) const fn effective_runtime_compiled_predicate(
242        &self,
243    ) -> Option<&PredicateProgram> {
244        match self
245            .static_execution_planning_contract()
246            .residual_filter_contract
247            .effective_runtime_filter_program()
248        {
249            Some(program) => program.predicate_program(),
250            None => None,
251        }
252    }
253
254    /// Borrow the planner-compiled effective runtime scalar filter expression.
255    #[cfg(test)]
256    #[must_use]
257    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
258        &self,
259    ) -> Option<&CompiledExpr> {
260        match self
261            .static_execution_planning_contract()
262            .residual_filter_contract
263            .effective_runtime_filter_program()
264        {
265            Some(program) => program.expression_filter(),
266            None => None,
267        }
268    }
269
270    /// Borrow the planner-frozen effective runtime scalar filter program.
271    #[must_use]
272    pub(in crate::db) const fn effective_runtime_filter_program(
273        &self,
274    ) -> Option<&EffectiveRuntimeFilterProgram> {
275        self.static_execution_planning_contract()
276            .residual_filter_contract
277            .effective_runtime_filter_program()
278    }
279
280    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
281    #[must_use]
282    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
283        if !self.scalar_plan().distinct {
284            return DistinctExecutionStrategy::None;
285        }
286
287        // DISTINCT on duplicate-safe single-path access shapes is a planner
288        // no-op for runtime dedup mechanics. Composite shapes can surface
289        // duplicate keys and therefore retain explicit dedup execution.
290        match distinct_runtime_dedup_strategy(&self.access) {
291            Some(strategy) => strategy,
292            None => DistinctExecutionStrategy::None,
293        }
294    }
295
296    /// Freeze one planner-owned route profile after model validation completes.
297    #[cfg(test)]
298    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
299        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
300    }
301
302    /// Freeze one planner-owned route profile from accepted schema authority.
303    pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
304        &mut self,
305        schema_info: &SchemaInfo,
306    ) {
307        self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
308    }
309
310    /// Freeze model-only executor metadata after logical/access planning completes.
311    #[cfg(test)]
312    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
313        &mut self,
314        model: &EntityModel,
315    ) -> Result<(), InternalError> {
316        self.finalize_static_execution_planning_contract_for_model_with_schema(
317            model,
318            SchemaInfo::cached_for_generated_entity_model(model),
319        )
320    }
321
322    /// Freeze planner-owned executor metadata with explicit schema authority.
323    pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
324        &mut self,
325        model: &EntityModel,
326        schema_info: &SchemaInfo,
327    ) -> Result<(), InternalError> {
328        self.static_execution_planning_contract = Some(
329            project_static_execution_planning_contract_for_model(model, schema_info, self)?,
330        );
331
332        Ok(())
333    }
334
335    /// Build one immutable execution-shape signature contract for runtime layers.
336    #[must_use]
337    pub(in crate::db) fn execution_shape_signature(
338        &self,
339        entity_path: &'static str,
340    ) -> ExecutionShapeSignature {
341        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
342    }
343
344    /// Return whether the chosen access contract fully satisfies the current
345    /// scalar query predicate without any additional post-access filtering.
346    #[must_use]
347    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
348        if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
349            return self.scalar_plan().predicate.is_some()
350                && !static_contract
351                    .residual_filter_contract
352                    .has_residual_filter();
353        }
354
355        derive_predicate_fully_satisfied_by_access_contract(self)
356    }
357
358    /// Borrow the planner-frozen compiled scalar projection program.
359    #[must_use]
360    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
361        self.static_execution_planning_contract()
362            .scalar_projection_plan
363            .as_deref()
364    }
365
366    /// Return whether planner-owned static execution metadata has already been frozen.
367    #[must_use]
368    pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
369        self.static_execution_planning_contract.is_some()
370    }
371
372    /// Borrow the planner-frozen ordered primary-key field names.
373    #[must_use]
374    pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
375        self.static_execution_planning_contract()
376            .primary_key_names
377            .iter()
378            .map(String::as_str)
379            .collect()
380    }
381
382    /// Borrow the planner-frozen projection slot reachability set.
383    #[must_use]
384    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
385        self.static_execution_planning_contract()
386            .projection_referenced_slots
387            .as_slice()
388    }
389
390    /// Borrow the planner-frozen mask for direct projected output slots.
391    #[must_use]
392    #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
393    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
394        self.static_execution_planning_contract()
395            .projected_slot_mask
396            .as_slice()
397    }
398
399    /// Return whether projection remains the full model-identity field list.
400    #[must_use]
401    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
402        self.static_execution_planning_contract()
403            .projection_is_model_identity
404    }
405
406    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
407    #[must_use]
408    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
409        self.static_execution_planning_contract()
410            .order_referenced_slots
411            .as_deref()
412    }
413
414    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
415    #[must_use]
416    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
417        self.static_execution_planning_contract()
418            .resolved_order
419            .as_ref()
420    }
421
422    /// Borrow the planner-frozen access slot map used by index predicate compilation.
423    #[must_use]
424    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
425        self.static_execution_planning_contract()
426            .slot_map
427            .as_deref()
428    }
429
430    /// Borrow grouped aggregate execution specs already resolved during static planning.
431    #[must_use]
432    pub(in crate::db) fn grouped_aggregate_execution_specs(
433        &self,
434    ) -> Option<&[GroupedAggregateExecutionSpec]> {
435        self.static_execution_planning_contract()
436            .grouped_aggregate_execution_specs
437            .as_deref()
438    }
439
440    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
441    #[must_use]
442    pub(in crate::db) const fn grouped_distinct_execution_strategy(
443        &self,
444    ) -> Option<&GroupedDistinctExecutionStrategy> {
445        self.static_execution_planning_contract()
446            .grouped_distinct_execution_strategy
447            .as_ref()
448    }
449
450    /// Borrow the frozen projection semantic shape without reopening model ownership.
451    #[must_use]
452    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
453        &self.static_execution_planning_contract().projection_spec
454    }
455
456    /// Borrow the frozen direct projection slots without reopening model ownership.
457    #[must_use]
458    #[cfg(any(test, feature = "sql"))]
459    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
460        self.static_execution_planning_contract()
461            .projection_direct_slots
462            .as_deref()
463    }
464
465    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
466    #[must_use]
467    #[cfg(any(test, feature = "sql"))]
468    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
469        self.static_execution_planning_contract()
470            .projection_data_row_direct_slots
471            .as_deref()
472    }
473
474    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
475    #[must_use]
476    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
477        self.static_execution_planning_contract()
478            .index_compile_targets
479            .as_deref()
480    }
481
482    const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
483        self.static_execution_planning_contract
484            .as_ref()
485            .expect("query semantics invariant")
486    }
487}
488
489fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
490    match access {
491        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
492            Some(DistinctExecutionStrategy::PreOrdered)
493        }
494        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
495            Some(DistinctExecutionStrategy::HashMaterialize)
496        }
497        AccessPlan::Path(_) => None,
498    }
499}
500
501fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
502    let is_grouped_safe = plan
503        .grouped_plan()
504        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
505
506    ContinuationPolicy::new(
507        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
508        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
509        is_grouped_safe,
510    )
511}
512
513/// Project one planner-owned route profile from the finalized logical+access plan.
514#[must_use]
515#[cfg(test)]
516pub(in crate::db) fn project_planner_route_profile_for_model(
517    model: &EntityModel,
518    plan: &AccessPlannedQuery,
519) -> PlannerRouteProfile {
520    let primary_key_names = ordered_primary_key_names(model);
521    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
522        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
523    });
524
525    PlannerRouteProfile::new(
526        derive_continuation_policy_validated(plan),
527        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
528        secondary_order_contract,
529    )
530}
531
532/// Project one planner-owned route profile from accepted schema authority.
533#[must_use]
534pub(in crate::db) fn project_planner_route_profile_for_schema(
535    schema_info: &SchemaInfo,
536    plan: &AccessPlannedQuery,
537) -> PlannerRouteProfile {
538    let primary_key_names = primary_key_names_from_schema(schema_info);
539    let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
540        order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
541    });
542
543    PlannerRouteProfile::new(
544        derive_continuation_policy_validated(plan),
545        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
546        secondary_order_contract,
547    )
548}
549
550fn project_static_execution_planning_contract_for_model(
551    model: &EntityModel,
552    schema_info: &SchemaInfo,
553    plan: &AccessPlannedQuery,
554) -> Result<StaticExecutionPlanningContract, InternalError> {
555    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
556    let execution_preparation_predicate = plan.execution_preparation_predicate();
557    let residual_filter_predicate = derive_residual_filter_predicate(plan);
558    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
559    let execution_preparation_compiled_predicate =
560        compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
561    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
562        schema_info,
563        residual_filter_expr.as_ref(),
564        residual_filter_predicate.as_ref(),
565    )?;
566    let residual_filter_contract = ResidualFilterContract::new(
567        residual_filter_expr,
568        residual_filter_predicate,
569        effective_runtime_filter_program,
570    );
571    let scalar_projection_plan = if plan.grouped_plan().is_none() {
572        Some(
573            compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
574                .ok_or_else(InternalError::query_executor_invariant)?
575                .iter()
576                .map(CompiledExpr::compile)
577                .collect(),
578        )
579    } else {
580        None
581    };
582    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
583        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
584    let projection_direct_slots = lower_direct_projection_slots_with_schema(
585        model,
586        schema_info,
587        &plan.logical,
588        &plan.projection_selection,
589    );
590    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
591        model,
592        schema_info,
593        &plan.logical,
594        &plan.projection_selection,
595    );
596    let projection_referenced_slots =
597        projection_spec.referenced_slots_for_schema(model, schema_info)?;
598    let projected_slot_mask =
599        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
600    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
601    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
602    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
603    let slot_map = slot_map_for_schema_plan(schema_info, plan);
604    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
605
606    Ok(StaticExecutionPlanningContract {
607        primary_key_names: schema_info.primary_key_names().to_vec(),
608        projection_spec,
609        execution_preparation_predicate,
610        execution_preparation_compiled_predicate,
611        residual_filter_contract,
612        scalar_projection_plan,
613        grouped_aggregate_execution_specs,
614        grouped_distinct_execution_strategy,
615        projection_direct_slots,
616        projection_data_row_direct_slots,
617        projection_referenced_slots,
618        projected_slot_mask,
619        projection_is_model_identity,
620        resolved_order,
621        order_referenced_slots,
622        slot_map,
623        index_compile_targets,
624    })
625}
626
627#[cfg(test)]
628fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
629    model.primary_key_names()
630}
631
632fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
633    schema_info
634        .primary_key_names()
635        .iter()
636        .map(String::as_str)
637        .collect()
638}
639
640// Compile the executor-owned residual scalar filter contract once from the
641// planner-derived residual artifacts so runtime never has to rediscover
642// residual presence or shape from semantic/filter/pushdown state.
643fn compile_effective_runtime_filter_program(
644    schema_info: &SchemaInfo,
645    residual_filter_expr: Option<&Expr>,
646    residual_filter_predicate: Option<&Predicate>,
647) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
648    // Keep the existing predicate fast path when the residual semantics still
649    // fit the derived predicate contract. The expression-owned lane is only
650    // needed once pushdown loses semantic coverage and a residual predicate no
651    // longer exists.
652    if let Some(predicate) = residual_filter_predicate {
653        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
654            PredicateProgram::compile_with_schema_info(schema_info, predicate),
655        )));
656    }
657
658    if let Some(filter_expr) = residual_filter_expr {
659        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
660            .ok_or_else(InternalError::query_invalid_logical_plan)?;
661
662        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
663            CompiledExpr::compile(&compiled),
664        )));
665    }
666
667    Ok(None)
668}
669
670// Derive the executor-preparation predicate once from the selected access path.
671// This strips only filtered-index guard clauses while preserving access-bound
672// equalities that still matter to preparation/explain consumers.
673fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
674    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
675
676    match plan.access.selected_index_contract() {
677        Some(index) => {
678            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
679        }
680        None => Some(query_predicate.clone()),
681    }
682}
683
684// Derive the final residual predicate once from the already-filtered
685// preparation predicate plus any equality bounds guaranteed by the concrete
686// access path.
687fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
688    let filtered_residual = derive_execution_preparation_predicate(plan);
689    let filtered_residual = filtered_residual.as_ref()?;
690
691    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
692}
693
694// Derive the explicit residual semantic expression once for finalized plans.
695// The residual expression remains the planner-owned semantic filter when any
696// runtime filtering still survives access satisfaction.
697fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
698    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
699    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
700        return None;
701    }
702
703    Some(filter_expr.clone())
704}
705
706// Derive the explicit residual semantic expression during finalization using
707// the trusted entity schema so compare-family literal normalization matches the
708// planner-owned predicate contract before residual ownership is decided.
709fn derive_residual_filter_expr_for_model(
710    model: &EntityModel,
711    plan: &AccessPlannedQuery,
712) -> Option<Expr> {
713    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
714    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
715        return None;
716    }
717
718    Some(filter_expr.clone())
719}
720
721// Return whether any residual filtering survives after access planning. This
722// helper exists only for pre-finalization assembly; finalized plans must read
723// the explicit residual artifacts frozen in `StaticExecutionPlanningContract`.
724fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
725    match (
726        plan.scalar_plan().filter_expr.as_ref(),
727        plan.scalar_plan().predicate.as_ref(),
728    ) {
729        (None, None) => false,
730        (Some(_), None) => true,
731        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
732    }
733}
734
735// Return true when the planner-owned predicate contract is fully satisfied by
736// access planning and no semantic residual filter expression survives.
737fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
738    plan.scalar_plan().predicate.is_some()
739        && derive_residual_filter_predicate(plan).is_none()
740        && derive_residual_filter_expr(plan).is_none()
741}
742
743// Return true when the semantic filter expression is entirely represented by
744// the planner-owned predicate contract and the chosen access path satisfies
745// that predicate without any runtime remainder.
746const fn derive_semantic_filter_fully_satisfied_by_access_contract(
747    plan: &AccessPlannedQuery,
748) -> bool {
749    plan.scalar_plan().filter_expr.is_some()
750        && plan.scalar_plan().predicate.is_some()
751        && plan.scalar_plan().predicate_covers_filter_expr
752}
753
754// Return true when finalized planning can prove that the semantic filter
755// expression is completely represented by the planner-owned predicate contract
756// after aligning compare literals through the trusted entity schema.
757const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
758    _model: &EntityModel,
759    plan: &AccessPlannedQuery,
760) -> bool {
761    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
762}
763
764// Compile one optional planner-frozen predicate program while keeping the
765// static planning assembly path free of repeated `Option` mapping boilerplate.
766fn compile_optional_predicate(
767    schema_info: &SchemaInfo,
768    predicate: Option<&Predicate>,
769) -> Option<PredicateProgram> {
770    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
771}
772
773// Resolve the grouped-only static planning semantics bundle once so grouped
774// aggregate execution specs and grouped DISTINCT strategy stay derived under
775// one shared grouped-plan branch.
776fn resolve_grouped_static_planning_semantics(
777    schema_info: &SchemaInfo,
778    plan: &AccessPlannedQuery,
779    projection_spec: &ProjectionSpec,
780) -> Result<
781    (
782        Option<Vec<GroupedAggregateExecutionSpec>>,
783        Option<GroupedDistinctExecutionStrategy>,
784    ),
785    InternalError,
786> {
787    let Some(grouped) = plan.grouped_plan() else {
788        return Ok((None, None));
789    };
790
791    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
792        projection_spec,
793        grouped.group.group_fields.as_slice(),
794        grouped.group.aggregates.as_slice(),
795    )?;
796    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
797
798    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
799        schema_info,
800        aggregate_specs.as_slice(),
801    )?);
802    let grouped_distinct_execution_strategy = Some(
803        resolved_grouped_distinct_execution_strategy_with_schema_info(
804            schema_info,
805            grouped.group.group_fields.as_slice(),
806            grouped.group.aggregates.as_slice(),
807            grouped.having_expr.as_ref(),
808        )?,
809    );
810
811    Ok((
812        grouped_aggregate_execution_specs,
813        grouped_distinct_execution_strategy,
814    ))
815}
816
817fn extend_grouped_having_aggregate_specs(
818    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
819    grouped: &GroupPlan,
820) -> Result<(), InternalError> {
821    if let Some(having_expr) = grouped.having_expr.as_ref() {
822        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
823    }
824
825    Ok(())
826}
827
828fn collect_grouped_having_expr_aggregate_specs(
829    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
830    expr: &Expr,
831) -> Result<(), InternalError> {
832    if !expr.contains_aggregate() {
833        return Ok(());
834    }
835
836    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
837        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
838
839        if aggregate_specs
840            .iter()
841            .all(|current| current != &aggregate_spec)
842        {
843            aggregate_specs.push(aggregate_spec);
844        }
845
846        Ok(())
847    })
848}
849
850fn projected_slot_mask_for_spec(
851    model: &EntityModel,
852    direct_projection_slots: Option<&[usize]>,
853) -> Vec<bool> {
854    let schema_slot_len = direct_projection_slots
855        .and_then(|slots| slots.iter().copied().max())
856        .map_or(0, |slot| slot.saturating_add(1));
857    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
858
859    let Some(direct_projection_slots) = direct_projection_slots else {
860        return projected_slots;
861    };
862
863    for slot in direct_projection_slots.iter().copied() {
864        if let Some(projected) = projected_slots.get_mut(slot) {
865            *projected = true;
866        }
867    }
868
869    projected_slots
870}
871
872fn resolved_order_for_plan(
873    schema_info: &SchemaInfo,
874    plan: &AccessPlannedQuery,
875) -> Result<Option<ResolvedOrder>, InternalError> {
876    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
877        return Ok(None);
878    }
879
880    let Some(order) = plan.scalar_plan().order.as_ref() else {
881        return Ok(None);
882    };
883
884    let mut fields = Vec::with_capacity(order.fields.len());
885    for term in &order.fields {
886        fields.push(ResolvedOrderField::new(
887            resolved_order_value_source_for_term(schema_info, term)?,
888            term.direction(),
889        ));
890    }
891
892    Ok(Some(ResolvedOrder::new(fields)))
893}
894
895fn resolved_order_value_source_for_term(
896    schema_info: &SchemaInfo,
897    term: &crate::db::query::plan::OrderTerm,
898) -> Result<ResolvedOrderValueSource, InternalError> {
899    if term.direct_field().is_none() {
900        let rendered = term.rendered_label();
901        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
902        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
903            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
904
905        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
906            &compiled,
907        )));
908    }
909
910    let field = term.direct_field().expect("query semantics invariant");
911    let slot = resolve_required_schema_slot(
912        schema_info,
913        field,
914        InternalError::query_invalid_logical_plan,
915    )?;
916
917    Ok(ResolvedOrderValueSource::direct_field(slot))
918}
919
920fn validate_resolved_order_expr_fields(
921    schema_info: &SchemaInfo,
922    expr: &Expr,
923    rendered: &str,
924) -> Result<(), InternalError> {
925    expr.try_for_each_tree_expr(&mut |node| match node {
926        Expr::Field(field_id) => resolve_required_schema_slot(
927            schema_info,
928            field_id.as_str(),
929            InternalError::query_invalid_logical_plan,
930        )
931        .map(|_| ()),
932        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
933        #[cfg(test)]
934        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
935        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
936        _ => Ok(()),
937    })
938}
939
940// Resolve one schema-authoritative field slot while keeping planner
941// invalid-logical-plan error construction at the callsite that owns the
942// diagnostic wording.
943fn resolve_required_schema_slot<F>(
944    schema_info: &SchemaInfo,
945    field: &str,
946    invalid_plan_error: F,
947) -> Result<usize, InternalError>
948where
949    F: FnOnce() -> InternalError,
950{
951    schema_info
952        .field_slot_index(field)
953        .ok_or_else(invalid_plan_error)
954}
955
956// Keep the scalar-order expression seam violation text under one helper so the
957// parse validation and compile validation paths do not drift.
958fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
959    InternalError::query_invalid_logical_plan()
960}
961
962// Keep one stable executor-facing slot list for grouped order terms after the
963// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
964// now consumes this same referenced-slot contract instead of re-deriving order
965// sources from planner strategy at runtime.
966fn order_referenced_slots_for_resolved_order(
967    resolved_order: Option<&ResolvedOrder>,
968) -> Option<Vec<usize>> {
969    Some(resolved_order?.referenced_slots())
970}
971
972fn slot_map_for_schema_plan(
973    schema_info: &SchemaInfo,
974    plan: &AccessPlannedQuery,
975) -> Option<Vec<usize>> {
976    let executable = plan.access.executable_contract();
977
978    resolved_index_slots_for_access_path(schema_info, &executable)
979}
980
981fn resolved_index_slots_for_access_path(
982    schema_info: &SchemaInfo,
983    access: &ExecutableAccessPlan<'_, crate::value::Value>,
984) -> Option<Vec<usize>> {
985    let path = access.as_path()?;
986    let path_facts = path.shape_facts();
987    let key_items = path_facts.index_key_items_for_slot_map()?;
988    let mut slots = Vec::new();
989
990    match key_items.key_items() {
991        SemanticIndexKeyItemsRef::Fields(fields) => {
992            slots.reserve(fields.len());
993            for field_name in fields {
994                let slot = schema_info.field_slot_index(field_name)?;
995                slots.push(slot);
996            }
997        }
998        SemanticIndexKeyItemsRef::Accepted(items) => {
999            slots.reserve(items.len());
1000            for key_item in items {
1001                let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1002                slots.push(slot);
1003            }
1004        }
1005        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1006            slots.reserve(fields.len());
1007            for &field_name in fields {
1008                let slot = schema_info.field_slot_index(field_name)?;
1009                slots.push(slot);
1010            }
1011        }
1012        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1013            slots.reserve(items.len());
1014            for key_item in items {
1015                let slot = schema_info.field_slot_index(key_item.field())?;
1016                slots.push(slot);
1017            }
1018        }
1019    }
1020
1021    Some(slots)
1022}
1023
1024fn index_compile_targets_for_schema_plan(
1025    schema_info: &SchemaInfo,
1026    plan: &AccessPlannedQuery,
1027) -> Option<Vec<IndexCompileTarget>> {
1028    let executable = plan.access.executable_contract();
1029    let path = executable.as_path()?;
1030    let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1031    let mut targets = Vec::new();
1032
1033    match key_items.key_items() {
1034        SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1035            return None;
1036        }
1037        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1038            for (component_index, &field_name) in fields.iter().enumerate() {
1039                let field_slot = schema_info.field_slot_index(field_name)?;
1040                targets.push(IndexCompileTarget {
1041                    component_index,
1042                    field_slot,
1043                    key_item: IndexKeyItem::Field(field_name),
1044                });
1045            }
1046        }
1047        SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1048            for (component_index, &key_item) in items.iter().enumerate() {
1049                let field_slot = schema_info.field_slot_index(key_item.field())?;
1050                targets.push(IndexCompileTarget {
1051                    component_index,
1052                    field_slot,
1053                    key_item,
1054                });
1055            }
1056        }
1057    }
1058
1059    Some(targets)
1060}