Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10        query::plan::{
11            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16            derive_logical_pushdown_eligibility,
17            expr::{
18                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr,
19                compile_scalar_projection_plan,
20            },
21            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
23            lower_projection_identity, lower_projection_intent,
24            residual_query_predicate_after_access_path_bounds,
25            residual_query_predicate_after_filtered_access,
26            resolved_grouped_distinct_execution_strategy_for_model,
27        },
28    },
29    error::InternalError,
30    model::{entity::EntityModel, index::IndexKeyItemsRef},
31};
32
33impl QueryMode {
34    /// True if this mode represents a load intent.
35    #[must_use]
36    pub const fn is_load(&self) -> bool {
37        match self {
38            Self::Load(_) => true,
39            Self::Delete(_) => false,
40        }
41    }
42
43    /// True if this mode represents a delete intent.
44    #[must_use]
45    pub const fn is_delete(&self) -> bool {
46        match self {
47            Self::Delete(_) => true,
48            Self::Load(_) => false,
49        }
50    }
51}
52
53impl LogicalPlan {
54    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
55    #[must_use]
56    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
57        match self {
58            Self::Scalar(plan) => plan,
59            Self::Grouped(plan) => &plan.scalar,
60        }
61    }
62
63    /// Borrow scalar semantic fields mutably across logical variants for tests.
64    #[must_use]
65    #[cfg(test)]
66    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
67        match self {
68            Self::Scalar(plan) => plan,
69            Self::Grouped(plan) => &mut plan.scalar,
70        }
71    }
72
73    /// Test-only shorthand for explicit scalar semantic borrowing.
74    #[must_use]
75    #[cfg(test)]
76    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
77        self.scalar_semantics()
78    }
79
80    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
81    #[must_use]
82    #[cfg(test)]
83    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
84        self.scalar_semantics_mut()
85    }
86}
87
88impl AccessPlannedQuery {
89    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
90    #[must_use]
91    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
92        self.logical.scalar_semantics()
93    }
94
95    /// Borrow scalar missing-row consistency without exposing the full scalar
96    /// plan to executor owners that only need row-presence policy.
97    #[must_use]
98    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
99        self.scalar_plan().consistency
100    }
101
102    /// Borrow scalar semantic fields mutably across logical variants for tests.
103    #[must_use]
104    #[cfg(test)]
105    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
106        self.logical.scalar_semantics_mut()
107    }
108
109    /// Test-only shorthand for explicit scalar plan borrowing.
110    #[must_use]
111    #[cfg(test)]
112    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
113        self.scalar_plan()
114    }
115
116    /// Test-only shorthand for explicit mutable scalar plan borrowing.
117    #[must_use]
118    #[cfg(test)]
119    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
120        self.scalar_plan_mut()
121    }
122
123    /// Borrow grouped semantic fields when this plan is grouped.
124    #[must_use]
125    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
126        match &self.logical {
127            LogicalPlan::Scalar(_) => None,
128            LogicalPlan::Grouped(plan) => Some(plan),
129        }
130    }
131
132    /// Lower this plan into one canonical planner-owned projection semantic spec.
133    #[must_use]
134    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
135        if let Some(static_shape) = &self.static_planning_shape {
136            return static_shape.projection_spec.clone();
137        }
138
139        lower_projection_intent(model, &self.logical, &self.projection_selection)
140    }
141
142    /// Lower this plan into one projection semantic shape for identity hashing.
143    #[must_use]
144    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
145        lower_projection_identity(&self.logical, &self.projection_selection)
146    }
147
148    /// Return the executor-facing predicate after removing only filtered-index
149    /// guard clauses the chosen access path already proves.
150    ///
151    /// This conservative form is used by preparation/explain surfaces that
152    /// still need to see access-bound equalities as index-predicate input.
153    #[must_use]
154    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
155        if let Some(static_shape) = self.static_planning_shape.as_ref() {
156            return static_shape.execution_preparation_predicate.clone();
157        }
158
159        derive_execution_preparation_predicate(self)
160    }
161
162    /// Return the executor-facing residual predicate after removing any
163    /// filtered-index guard clauses and fixed access-bound equalities already
164    /// guaranteed by the chosen path.
165    #[must_use]
166    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
167        if let Some(static_shape) = self.static_planning_shape.as_ref() {
168            return static_shape.residual_filter_predicate.clone();
169        }
170
171        derive_residual_filter_predicate(self)
172    }
173
174    /// Return whether one explicit residual predicate survives access
175    /// planning and still participates in residual execution.
176    #[must_use]
177    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
178        self.effective_execution_predicate().is_some()
179    }
180
181    /// Borrow the planner-owned residual scalar filter expression when one
182    /// surviving semantic remainder still requires runtime evaluation.
183    #[must_use]
184    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
185        if let Some(static_shape) = self.static_planning_shape.as_ref() {
186            return static_shape.residual_filter_expr.as_ref();
187        }
188
189        if !derive_has_residual_filter(self) {
190            return None;
191        }
192
193        self.scalar_plan().filter_expr.as_ref()
194    }
195
196    /// Return whether one explicit residual scalar filter expression survives
197    /// access planning and still requires runtime evaluation.
198    #[must_use]
199    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
200        self.residual_filter_expr().is_some()
201    }
202
203    /// Borrow the planner-compiled execution-preparation predicate program.
204    #[must_use]
205    pub(in crate::db) const fn execution_preparation_compiled_predicate(
206        &self,
207    ) -> Option<&PredicateProgram> {
208        self.static_planning_shape()
209            .execution_preparation_compiled_predicate
210            .as_ref()
211    }
212
213    /// Borrow the planner-compiled effective runtime predicate program.
214    #[must_use]
215    pub(in crate::db) const fn effective_runtime_compiled_predicate(
216        &self,
217    ) -> Option<&PredicateProgram> {
218        match self
219            .static_planning_shape()
220            .effective_runtime_filter_program
221            .as_ref()
222        {
223            Some(program) => program.predicate_program(),
224            None => None,
225        }
226    }
227
228    /// Borrow the planner-compiled effective runtime scalar filter expression.
229    #[cfg(test)]
230    #[must_use]
231    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
232        &self,
233    ) -> Option<&CompiledExpr> {
234        match self
235            .static_planning_shape()
236            .effective_runtime_filter_program
237            .as_ref()
238        {
239            Some(program) => program.expression_filter(),
240            None => None,
241        }
242    }
243
244    /// Borrow the planner-frozen effective runtime scalar filter program.
245    #[must_use]
246    pub(in crate::db) const fn effective_runtime_filter_program(
247        &self,
248    ) -> Option<&EffectiveRuntimeFilterProgram> {
249        self.static_planning_shape()
250            .effective_runtime_filter_program
251            .as_ref()
252    }
253
254    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
255    #[must_use]
256    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
257        if !self.scalar_plan().distinct {
258            return DistinctExecutionStrategy::None;
259        }
260
261        // DISTINCT on duplicate-safe single-path access shapes is a planner
262        // no-op for runtime dedup mechanics. Composite shapes can surface
263        // duplicate keys and therefore retain explicit dedup execution.
264        match distinct_runtime_dedup_strategy(&self.access) {
265            Some(strategy) => strategy,
266            None => DistinctExecutionStrategy::None,
267        }
268    }
269
270    /// Freeze one planner-owned route profile after model validation completes.
271    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
272        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
273    }
274
275    /// Freeze planner-owned executor metadata after logical/access planning completes.
276    pub(in crate::db) fn finalize_static_planning_shape_for_model(
277        &mut self,
278        model: &EntityModel,
279    ) -> Result<(), InternalError> {
280        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
281
282        Ok(())
283    }
284
285    /// Build one immutable execution-shape signature contract for runtime layers.
286    #[must_use]
287    pub(in crate::db) fn execution_shape_signature(
288        &self,
289        entity_path: &'static str,
290    ) -> ExecutionShapeSignature {
291        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
292    }
293
294    /// Return whether the chosen access contract fully satisfies the current
295    /// scalar query predicate without any additional post-access filtering.
296    #[must_use]
297    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
298        if let Some(static_shape) = self.static_planning_shape.as_ref() {
299            return self.scalar_plan().predicate.is_some()
300                && static_shape.residual_filter_predicate.is_none()
301                && static_shape.residual_filter_expr.is_none();
302        }
303
304        derive_predicate_fully_satisfied_by_access_contract(self)
305    }
306
307    /// Borrow the planner-frozen compiled scalar projection program.
308    #[must_use]
309    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
310        self.static_planning_shape()
311            .scalar_projection_plan
312            .as_deref()
313    }
314
315    /// Borrow the planner-frozen primary-key field name.
316    #[must_use]
317    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
318        self.static_planning_shape().primary_key_name
319    }
320
321    /// Borrow the planner-frozen projection slot reachability set.
322    #[must_use]
323    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
324        self.static_planning_shape()
325            .projection_referenced_slots
326            .as_slice()
327    }
328
329    /// Borrow the planner-frozen mask for direct projected output slots.
330    #[must_use]
331    #[cfg(any(test, feature = "diagnostics"))]
332    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
333        self.static_planning_shape().projected_slot_mask.as_slice()
334    }
335
336    /// Return whether projection remains the full model-identity field list.
337    #[must_use]
338    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
339        self.static_planning_shape().projection_is_model_identity
340    }
341
342    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
343    #[must_use]
344    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
345        self.static_planning_shape()
346            .order_referenced_slots
347            .as_deref()
348    }
349
350    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
351    #[must_use]
352    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
353        self.static_planning_shape().resolved_order.as_ref()
354    }
355
356    /// Borrow the planner-frozen access slot map used by index predicate compilation.
357    #[must_use]
358    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
359        self.static_planning_shape().slot_map.as_deref()
360    }
361
362    /// Borrow grouped aggregate execution specs already resolved during static planning.
363    #[must_use]
364    pub(in crate::db) fn grouped_aggregate_execution_specs(
365        &self,
366    ) -> Option<&[GroupedAggregateExecutionSpec]> {
367        self.static_planning_shape()
368            .grouped_aggregate_execution_specs
369            .as_deref()
370    }
371
372    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
373    #[must_use]
374    pub(in crate::db) const fn grouped_distinct_execution_strategy(
375        &self,
376    ) -> Option<&GroupedDistinctExecutionStrategy> {
377        self.static_planning_shape()
378            .grouped_distinct_execution_strategy
379            .as_ref()
380    }
381
382    /// Borrow the frozen projection semantic shape without reopening model ownership.
383    #[must_use]
384    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
385        &self.static_planning_shape().projection_spec
386    }
387
388    /// Borrow the frozen direct projection slots without reopening model ownership.
389    #[must_use]
390    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
391        self.static_planning_shape()
392            .projection_direct_slots
393            .as_deref()
394    }
395
396    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
397    #[must_use]
398    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
399        self.static_planning_shape()
400            .index_compile_targets
401            .as_deref()
402    }
403
404    const fn static_planning_shape(&self) -> &StaticPlanningShape {
405        self.static_planning_shape
406            .as_ref()
407            .expect("access-planned queries must freeze static planning shape before execution")
408    }
409}
410
411fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
412    match access {
413        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
414            Some(DistinctExecutionStrategy::PreOrdered)
415        }
416        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
417            Some(DistinctExecutionStrategy::HashMaterialize)
418        }
419        AccessPlan::Path(_) => None,
420    }
421}
422
423fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
424    let is_grouped_safe = plan
425        .grouped_plan()
426        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
427
428    ContinuationPolicy::new(
429        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
430        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
431        is_grouped_safe,
432    )
433}
434
435/// Project one planner-owned route profile from the finalized logical+access plan.
436#[must_use]
437pub(in crate::db) fn project_planner_route_profile_for_model(
438    model: &EntityModel,
439    plan: &AccessPlannedQuery,
440) -> PlannerRouteProfile {
441    let secondary_order_contract = plan
442        .scalar_plan()
443        .order
444        .as_ref()
445        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
446
447    PlannerRouteProfile::new(
448        derive_continuation_policy_validated(plan),
449        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
450        secondary_order_contract,
451    )
452}
453
454fn project_static_planning_shape_for_model(
455    model: &EntityModel,
456    plan: &AccessPlannedQuery,
457) -> Result<StaticPlanningShape, InternalError> {
458    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
459    let execution_preparation_predicate = plan.execution_preparation_predicate();
460    let residual_filter_predicate = derive_residual_filter_predicate(plan);
461    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
462    let execution_preparation_compiled_predicate =
463        compile_optional_predicate(model, execution_preparation_predicate.as_ref());
464    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
465        model,
466        residual_filter_expr.as_ref(),
467        residual_filter_predicate.as_ref(),
468    )?;
469    let scalar_projection_plan = if plan.grouped_plan().is_none() {
470        Some(
471                compile_scalar_projection_plan(model, &projection_spec)
472                    .ok_or_else(|| {
473                        InternalError::query_executor_invariant(
474                            "scalar projection program must compile during static planning finalization",
475                        )
476                    })?
477                    .iter()
478                    .map(CompiledExpr::compile)
479                    .collect(),
480            )
481    } else {
482        None
483    };
484    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
485        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
486    let projection_direct_slots =
487        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
488    let projection_referenced_slots = projection_spec.referenced_slots_for(model)?;
489    let projected_slot_mask =
490        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
491    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
492    let resolved_order = resolved_order_for_plan(model, plan)?;
493    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
494    let slot_map = slot_map_for_model_plan(model, plan);
495    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
496
497    Ok(StaticPlanningShape {
498        primary_key_name: model.primary_key.name,
499        projection_spec,
500        execution_preparation_predicate,
501        residual_filter_expr,
502        residual_filter_predicate,
503        execution_preparation_compiled_predicate,
504        effective_runtime_filter_program,
505        scalar_projection_plan,
506        grouped_aggregate_execution_specs,
507        grouped_distinct_execution_strategy,
508        projection_direct_slots,
509        projection_referenced_slots,
510        projected_slot_mask,
511        projection_is_model_identity,
512        resolved_order,
513        order_referenced_slots,
514        slot_map,
515        index_compile_targets,
516    })
517}
518
519// Compile the executor-owned residual scalar filter contract once from the
520// planner-derived residual artifacts so runtime never has to rediscover
521// residual presence or shape from semantic/filter/pushdown state.
522fn compile_effective_runtime_filter_program(
523    model: &EntityModel,
524    residual_filter_expr: Option<&Expr>,
525    residual_filter_predicate: Option<&Predicate>,
526) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
527    // Keep the existing predicate fast path when the residual semantics still
528    // fit the derived predicate contract. The expression-owned lane is only
529    // needed once pushdown loses semantic coverage and a residual predicate no
530    // longer exists.
531    if let Some(predicate) = residual_filter_predicate {
532        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
533            PredicateProgram::compile(model, predicate),
534        )));
535    }
536
537    if let Some(filter_expr) = residual_filter_expr {
538        let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
539            InternalError::query_invalid_logical_plan(
540                "effective runtime scalar filter expression must compile during static planning finalization",
541            )
542        })?;
543
544        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
545            CompiledExpr::compile(&compiled),
546        )));
547    }
548
549    Ok(None)
550}
551
552// Derive the executor-preparation predicate once from the selected access path.
553// This strips only filtered-index guard clauses while preserving access-bound
554// equalities that still matter to preparation/explain consumers.
555fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
556    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
557
558    match plan.access.selected_index_model() {
559        Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
560        None => Some(query_predicate.clone()),
561    }
562}
563
564// Derive the final residual predicate once from the already-filtered
565// preparation predicate plus any equality bounds guaranteed by the concrete
566// access path.
567fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
568    let filtered_residual = derive_execution_preparation_predicate(plan);
569    let filtered_residual = filtered_residual.as_ref()?;
570
571    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
572}
573
574// Derive the explicit residual semantic expression once for finalized plans.
575// The residual expression remains the planner-owned semantic filter when any
576// runtime filtering still survives access satisfaction.
577fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
578    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
579    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
580        return None;
581    }
582
583    Some(filter_expr.clone())
584}
585
586// Derive the explicit residual semantic expression during finalization using
587// the trusted entity schema so compare-family literal normalization matches the
588// planner-owned predicate contract before residual ownership is decided.
589fn derive_residual_filter_expr_for_model(
590    model: &EntityModel,
591    plan: &AccessPlannedQuery,
592) -> Option<Expr> {
593    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
594    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
595        return None;
596    }
597
598    Some(filter_expr.clone())
599}
600
601// Return whether any residual filtering survives after access planning. This
602// helper exists only for pre-finalization assembly; finalized plans must read
603// the explicit residual artifacts frozen in `StaticPlanningShape`.
604fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
605    match (
606        plan.scalar_plan().filter_expr.as_ref(),
607        plan.scalar_plan().predicate.as_ref(),
608    ) {
609        (None, None) => false,
610        (Some(_), None) => true,
611        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
612    }
613}
614
615// Return true when the planner-owned predicate contract is fully satisfied by
616// access planning and no semantic residual filter expression survives.
617fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
618    plan.scalar_plan().predicate.is_some()
619        && derive_residual_filter_predicate(plan).is_none()
620        && derive_residual_filter_expr(plan).is_none()
621}
622
623// Return true when the semantic filter expression is entirely represented by
624// the planner-owned predicate contract and the chosen access path satisfies
625// that predicate without any runtime remainder.
626const fn derive_semantic_filter_fully_satisfied_by_access_contract(
627    plan: &AccessPlannedQuery,
628) -> bool {
629    plan.scalar_plan().filter_expr.is_some()
630        && plan.scalar_plan().predicate.is_some()
631        && plan.scalar_plan().predicate_covers_filter_expr
632}
633
634// Return true when finalized planning can prove that the semantic filter
635// expression is completely represented by the planner-owned predicate contract
636// after aligning compare literals through the trusted entity schema.
637const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
638    _model: &EntityModel,
639    plan: &AccessPlannedQuery,
640) -> bool {
641    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
642}
643
644// Compile one optional planner-frozen predicate program while keeping the
645// static planning assembly path free of repeated `Option` mapping boilerplate.
646fn compile_optional_predicate(
647    model: &EntityModel,
648    predicate: Option<&Predicate>,
649) -> Option<PredicateProgram> {
650    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
651}
652
653// Resolve the grouped-only static planning semantics bundle once so grouped
654// aggregate execution specs and grouped DISTINCT strategy stay derived under
655// one shared grouped-plan branch.
656fn resolve_grouped_static_planning_semantics(
657    model: &EntityModel,
658    plan: &AccessPlannedQuery,
659    projection_spec: &ProjectionSpec,
660) -> Result<
661    (
662        Option<Vec<GroupedAggregateExecutionSpec>>,
663        Option<GroupedDistinctExecutionStrategy>,
664    ),
665    InternalError,
666> {
667    let Some(grouped) = plan.grouped_plan() else {
668        return Ok((None, None));
669    };
670
671    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
672        projection_spec,
673        grouped.group.group_fields.as_slice(),
674        grouped.group.aggregates.as_slice(),
675    )?;
676    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
677
678    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
679        model,
680        aggregate_specs.as_slice(),
681    )?);
682    let grouped_distinct_execution_strategy =
683        Some(resolved_grouped_distinct_execution_strategy_for_model(
684            model,
685            grouped.group.group_fields.as_slice(),
686            grouped.group.aggregates.as_slice(),
687            grouped.having_expr.as_ref(),
688        )?);
689
690    Ok((
691        grouped_aggregate_execution_specs,
692        grouped_distinct_execution_strategy,
693    ))
694}
695
696fn extend_grouped_having_aggregate_specs(
697    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
698    grouped: &GroupPlan,
699) -> Result<(), InternalError> {
700    if let Some(having_expr) = grouped.having_expr.as_ref() {
701        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
702    }
703
704    Ok(())
705}
706
707fn collect_grouped_having_expr_aggregate_specs(
708    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
709    expr: &Expr,
710) -> Result<(), InternalError> {
711    if !expr.contains_aggregate() {
712        return Ok(());
713    }
714
715    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
716        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
717
718        if aggregate_specs
719            .iter()
720            .all(|current| current != &aggregate_spec)
721        {
722            aggregate_specs.push(aggregate_spec);
723        }
724
725        Ok(())
726    })
727}
728
729fn projected_slot_mask_for_spec(
730    model: &EntityModel,
731    direct_projection_slots: Option<&[usize]>,
732) -> Vec<bool> {
733    let mut projected_slots = vec![false; model.fields().len()];
734
735    let Some(direct_projection_slots) = direct_projection_slots else {
736        return projected_slots;
737    };
738
739    for slot in direct_projection_slots.iter().copied() {
740        if let Some(projected) = projected_slots.get_mut(slot) {
741            *projected = true;
742        }
743    }
744
745    projected_slots
746}
747
748fn resolved_order_for_plan(
749    model: &EntityModel,
750    plan: &AccessPlannedQuery,
751) -> Result<Option<ResolvedOrder>, InternalError> {
752    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
753        return Ok(None);
754    }
755
756    let Some(order) = plan.scalar_plan().order.as_ref() else {
757        return Ok(None);
758    };
759
760    let mut fields = Vec::with_capacity(order.fields.len());
761    for term in &order.fields {
762        fields.push(ResolvedOrderField::new(
763            resolved_order_value_source_for_term(model, term)?,
764            term.direction(),
765        ));
766    }
767
768    Ok(Some(ResolvedOrder::new(fields)))
769}
770
771fn resolved_order_value_source_for_term(
772    model: &EntityModel,
773    term: &crate::db::query::plan::OrderTerm,
774) -> Result<ResolvedOrderValueSource, InternalError> {
775    if term.direct_field().is_none() {
776        let rendered = term.rendered_label();
777        validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
778        let compiled = compile_scalar_projection_expr(model, term.expr())
779            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
780
781        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
782            &compiled,
783        )));
784    }
785
786    let field = term
787        .direct_field()
788        .expect("direct-field order branch should only execute for field-backed terms");
789    let slot = resolve_required_field_slot(model, field, || {
790        InternalError::query_invalid_logical_plan(format!(
791            "order expression references unknown field '{field}'",
792        ))
793    })?;
794
795    Ok(ResolvedOrderValueSource::direct_field(slot))
796}
797
798fn validate_resolved_order_expr_fields(
799    model: &EntityModel,
800    expr: &Expr,
801    rendered: &str,
802) -> Result<(), InternalError> {
803    expr.try_for_each_tree_expr(&mut |node| match node {
804        Expr::Field(field_id) => resolve_required_field_slot(model, field_id.as_str(), || {
805            InternalError::query_invalid_logical_plan(format!(
806                "order expression references unknown field '{rendered}'",
807            ))
808        })
809        .map(|_| ()),
810        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
811        #[cfg(test)]
812        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
813        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
814        _ => Ok(()),
815    })
816}
817
818// Resolve one model field slot while keeping planner invalid-logical-plan
819// error construction at the callsite that owns the diagnostic wording.
820fn resolve_required_field_slot<F>(
821    model: &EntityModel,
822    field: &str,
823    invalid_plan_error: F,
824) -> Result<usize, InternalError>
825where
826    F: FnOnce() -> InternalError,
827{
828    model
829        .resolve_field_slot(field)
830        .ok_or_else(invalid_plan_error)
831}
832
833// Keep the scalar-order expression seam violation text under one helper so the
834// parse validation and compile validation paths do not drift.
835fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
836    InternalError::query_invalid_logical_plan(format!(
837        "order expression '{rendered}' did not stay on the scalar expression seam",
838    ))
839}
840
841// Keep one stable executor-facing slot list for grouped order terms after the
842// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
843// now consumes this same referenced-slot contract instead of re-deriving order
844// sources from planner strategy at runtime.
845fn order_referenced_slots_for_resolved_order(
846    resolved_order: Option<&ResolvedOrder>,
847) -> Option<Vec<usize>> {
848    Some(resolved_order?.referenced_slots())
849}
850
851fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
852    let executable = plan.access.executable_contract();
853
854    resolved_index_slots_for_access_path(model, &executable)
855}
856
857fn resolved_index_slots_for_access_path(
858    model: &EntityModel,
859    access: &ExecutableAccessPlan<'_, crate::value::Value>,
860) -> Option<Vec<usize>> {
861    let path = access.as_path()?;
862    let path_capabilities = path.capabilities();
863    let index_fields = path_capabilities.index_fields_for_slot_map()?;
864    let mut slots = Vec::with_capacity(index_fields.len());
865
866    for field_name in index_fields {
867        let slot = model.resolve_field_slot(field_name)?;
868        slots.push(slot);
869    }
870
871    Some(slots)
872}
873
874fn index_compile_targets_for_model_plan(
875    model: &EntityModel,
876    plan: &AccessPlannedQuery,
877) -> Option<Vec<IndexCompileTarget>> {
878    let index = plan.access.as_path()?.selected_index_model()?;
879    let mut targets = Vec::new();
880
881    match index.key_items() {
882        IndexKeyItemsRef::Fields(fields) => {
883            for (component_index, &field_name) in fields.iter().enumerate() {
884                let field_slot = model.resolve_field_slot(field_name)?;
885                targets.push(IndexCompileTarget {
886                    component_index,
887                    field_slot,
888                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
889                });
890            }
891        }
892        IndexKeyItemsRef::Items(items) => {
893            for (component_index, &key_item) in items.iter().enumerate() {
894                let field_slot = model.resolve_field_slot(key_item.field())?;
895                targets.push(IndexCompileTarget {
896                    component_index,
897                    field_slot,
898                    key_item,
899                });
900            }
901        }
902    }
903
904    Some(targets)
905}