Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::{IndexCompileTarget, Predicate, PredicateProgram},
10        query::plan::{
11            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16            derive_logical_pushdown_eligibility,
17            expr::{
18                Expr, ProjectionSpec, ScalarProjectionExpr, compile_scalar_projection_expr,
19                compile_scalar_projection_plan,
20            },
21            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
23            lower_projection_identity, lower_projection_intent,
24            residual_query_predicate_after_access_path_bounds,
25            residual_query_predicate_after_filtered_access,
26            resolved_grouped_distinct_execution_strategy_for_model,
27        },
28    },
29    error::InternalError,
30    model::{entity::EntityModel, index::IndexKeyItemsRef},
31};
32
33impl QueryMode {
34    /// True if this mode represents a load intent.
35    #[must_use]
36    pub const fn is_load(&self) -> bool {
37        match self {
38            Self::Load(_) => true,
39            Self::Delete(_) => false,
40        }
41    }
42
43    /// True if this mode represents a delete intent.
44    #[must_use]
45    pub const fn is_delete(&self) -> bool {
46        match self {
47            Self::Delete(_) => true,
48            Self::Load(_) => false,
49        }
50    }
51}
52
53impl LogicalPlan {
54    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
55    #[must_use]
56    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
57        match self {
58            Self::Scalar(plan) => plan,
59            Self::Grouped(plan) => &plan.scalar,
60        }
61    }
62
63    /// Borrow scalar semantic fields mutably across logical variants for tests.
64    #[must_use]
65    #[cfg(test)]
66    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
67        match self {
68            Self::Scalar(plan) => plan,
69            Self::Grouped(plan) => &mut plan.scalar,
70        }
71    }
72
73    /// Test-only shorthand for explicit scalar semantic borrowing.
74    #[must_use]
75    #[cfg(test)]
76    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
77        self.scalar_semantics()
78    }
79
80    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
81    #[must_use]
82    #[cfg(test)]
83    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
84        self.scalar_semantics_mut()
85    }
86}
87
88impl AccessPlannedQuery {
89    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
90    #[must_use]
91    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
92        self.logical.scalar_semantics()
93    }
94
95    /// Borrow scalar semantic fields mutably across logical variants for tests.
96    #[must_use]
97    #[cfg(test)]
98    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
99        self.logical.scalar_semantics_mut()
100    }
101
102    /// Test-only shorthand for explicit scalar plan borrowing.
103    #[must_use]
104    #[cfg(test)]
105    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
106        self.scalar_plan()
107    }
108
109    /// Test-only shorthand for explicit mutable scalar plan borrowing.
110    #[must_use]
111    #[cfg(test)]
112    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
113        self.scalar_plan_mut()
114    }
115
116    /// Borrow grouped semantic fields when this plan is grouped.
117    #[must_use]
118    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
119        match &self.logical {
120            LogicalPlan::Scalar(_) => None,
121            LogicalPlan::Grouped(plan) => Some(plan),
122        }
123    }
124
125    /// Lower this plan into one canonical planner-owned projection semantic spec.
126    #[must_use]
127    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
128        if let Some(static_shape) = &self.static_planning_shape {
129            return static_shape.projection_spec.clone();
130        }
131
132        lower_projection_intent(model, &self.logical, &self.projection_selection)
133    }
134
135    /// Lower this plan into one projection semantic shape for identity hashing.
136    #[must_use]
137    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
138        lower_projection_identity(&self.logical, &self.projection_selection)
139    }
140
141    /// Return the executor-facing predicate after removing only filtered-index
142    /// guard clauses the chosen access path already proves.
143    ///
144    /// This conservative form is used by preparation/explain surfaces that
145    /// still need to see access-bound equalities as index-predicate input.
146    #[must_use]
147    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
148        if let Some(static_shape) = self.static_planning_shape.as_ref() {
149            return static_shape.execution_preparation_predicate.clone();
150        }
151
152        derive_execution_preparation_predicate(self)
153    }
154
155    /// Return the executor-facing residual predicate after removing any
156    /// filtered-index guard clauses and fixed access-bound equalities already
157    /// guaranteed by the chosen path.
158    #[must_use]
159    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
160        if let Some(static_shape) = self.static_planning_shape.as_ref() {
161            return static_shape.residual_filter_predicate.clone();
162        }
163
164        derive_residual_filter_predicate(self)
165    }
166
167    /// Return whether one explicit residual predicate survives access
168    /// planning and still participates in residual execution.
169    #[must_use]
170    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
171        self.effective_execution_predicate().is_some()
172    }
173
174    /// Borrow the planner-owned residual scalar filter expression when one
175    /// surviving semantic remainder still requires runtime evaluation.
176    #[must_use]
177    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
178        if let Some(static_shape) = self.static_planning_shape.as_ref() {
179            return static_shape.residual_filter_expr.as_ref();
180        }
181
182        if !derive_has_residual_filter(self) {
183            return None;
184        }
185
186        self.scalar_plan().filter_expr.as_ref()
187    }
188
189    /// Return whether one explicit residual scalar filter expression survives
190    /// access planning and still requires runtime evaluation.
191    #[must_use]
192    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
193        self.residual_filter_expr().is_some()
194    }
195
196    /// Borrow the planner-compiled execution-preparation predicate program.
197    #[must_use]
198    pub(in crate::db) const fn execution_preparation_compiled_predicate(
199        &self,
200    ) -> Option<&PredicateProgram> {
201        self.static_planning_shape()
202            .execution_preparation_compiled_predicate
203            .as_ref()
204    }
205
206    /// Borrow the planner-compiled effective runtime predicate program.
207    #[must_use]
208    pub(in crate::db) const fn effective_runtime_compiled_predicate(
209        &self,
210    ) -> Option<&PredicateProgram> {
211        match self
212            .static_planning_shape()
213            .effective_runtime_filter_program
214            .as_ref()
215        {
216            Some(EffectiveRuntimeFilterProgram::Predicate(program)) => Some(program),
217            Some(EffectiveRuntimeFilterProgram::Expr(_)) | None => None,
218        }
219    }
220
221    /// Borrow the planner-compiled effective runtime scalar filter expression.
222    #[must_use]
223    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
224        &self,
225    ) -> Option<&ScalarProjectionExpr> {
226        match self
227            .static_planning_shape()
228            .effective_runtime_filter_program
229            .as_ref()
230        {
231            Some(EffectiveRuntimeFilterProgram::Expr(expr)) => Some(expr),
232            Some(EffectiveRuntimeFilterProgram::Predicate(_)) | None => None,
233        }
234    }
235
236    /// Borrow the planner-frozen effective runtime scalar filter program.
237    #[must_use]
238    pub(in crate::db) const fn effective_runtime_filter_program(
239        &self,
240    ) -> Option<&EffectiveRuntimeFilterProgram> {
241        self.static_planning_shape()
242            .effective_runtime_filter_program
243            .as_ref()
244    }
245
246    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
247    #[must_use]
248    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
249        if !self.scalar_plan().distinct {
250            return DistinctExecutionStrategy::None;
251        }
252
253        // DISTINCT on duplicate-safe single-path access shapes is a planner
254        // no-op for runtime dedup mechanics. Composite shapes can surface
255        // duplicate keys and therefore retain explicit dedup execution.
256        match distinct_runtime_dedup_strategy(&self.access) {
257            Some(strategy) => strategy,
258            None => DistinctExecutionStrategy::None,
259        }
260    }
261
262    /// Freeze one planner-owned route profile after model validation completes.
263    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
264        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
265    }
266
267    /// Freeze planner-owned executor metadata after logical/access planning completes.
268    pub(in crate::db) fn finalize_static_planning_shape_for_model(
269        &mut self,
270        model: &EntityModel,
271    ) -> Result<(), InternalError> {
272        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
273
274        Ok(())
275    }
276
277    /// Build one immutable execution-shape signature contract for runtime layers.
278    #[must_use]
279    pub(in crate::db) fn execution_shape_signature(
280        &self,
281        entity_path: &'static str,
282    ) -> ExecutionShapeSignature {
283        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
284    }
285
286    /// Return whether the chosen access contract fully satisfies the current
287    /// scalar query predicate without any additional post-access filtering.
288    #[must_use]
289    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
290        if let Some(static_shape) = self.static_planning_shape.as_ref() {
291            return self.scalar_plan().predicate.is_some()
292                && static_shape.residual_filter_predicate.is_none()
293                && static_shape.residual_filter_expr.is_none();
294        }
295
296        derive_predicate_fully_satisfied_by_access_contract(self)
297    }
298
299    /// Borrow the planner-frozen compiled scalar projection program.
300    #[must_use]
301    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
302        self.static_planning_shape()
303            .scalar_projection_plan
304            .as_deref()
305    }
306
307    /// Borrow the planner-frozen primary-key field name.
308    #[must_use]
309    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
310        self.static_planning_shape().primary_key_name
311    }
312
313    /// Borrow the planner-frozen projection slot reachability set.
314    #[must_use]
315    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
316        self.static_planning_shape()
317            .projection_referenced_slots
318            .as_slice()
319    }
320
321    /// Borrow the planner-frozen mask for direct projected output slots.
322    #[must_use]
323    #[cfg(any(test, feature = "diagnostics"))]
324    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
325        self.static_planning_shape().projected_slot_mask.as_slice()
326    }
327
328    /// Return whether projection remains the full model-identity field list.
329    #[must_use]
330    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
331        self.static_planning_shape().projection_is_model_identity
332    }
333
334    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
335    #[must_use]
336    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
337        self.static_planning_shape()
338            .order_referenced_slots
339            .as_deref()
340    }
341
342    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
343    #[must_use]
344    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
345        self.static_planning_shape().resolved_order.as_ref()
346    }
347
348    /// Borrow the planner-frozen access slot map used by index predicate compilation.
349    #[must_use]
350    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
351        self.static_planning_shape().slot_map.as_deref()
352    }
353
354    /// Borrow grouped aggregate execution specs already resolved during static planning.
355    #[must_use]
356    pub(in crate::db) fn grouped_aggregate_execution_specs(
357        &self,
358    ) -> Option<&[GroupedAggregateExecutionSpec]> {
359        self.static_planning_shape()
360            .grouped_aggregate_execution_specs
361            .as_deref()
362    }
363
364    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
365    #[must_use]
366    pub(in crate::db) const fn grouped_distinct_execution_strategy(
367        &self,
368    ) -> Option<&GroupedDistinctExecutionStrategy> {
369        self.static_planning_shape()
370            .grouped_distinct_execution_strategy
371            .as_ref()
372    }
373
374    /// Borrow the frozen projection semantic shape without reopening model ownership.
375    #[must_use]
376    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
377        &self.static_planning_shape().projection_spec
378    }
379
380    /// Borrow the frozen direct projection slots without reopening model ownership.
381    #[must_use]
382    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
383        self.static_planning_shape()
384            .projection_direct_slots
385            .as_deref()
386    }
387
388    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
389    #[must_use]
390    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
391        self.static_planning_shape()
392            .index_compile_targets
393            .as_deref()
394    }
395
396    const fn static_planning_shape(&self) -> &StaticPlanningShape {
397        self.static_planning_shape
398            .as_ref()
399            .expect("access-planned queries must freeze static planning shape before execution")
400    }
401}
402
403fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
404    match access {
405        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
406            Some(DistinctExecutionStrategy::PreOrdered)
407        }
408        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
409            Some(DistinctExecutionStrategy::HashMaterialize)
410        }
411        AccessPlan::Path(_) => None,
412    }
413}
414
415fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
416    let is_grouped_safe = plan
417        .grouped_plan()
418        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
419
420    ContinuationPolicy::new(
421        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
422        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
423        is_grouped_safe,
424    )
425}
426
427/// Project one planner-owned route profile from the finalized logical+access plan.
428#[must_use]
429pub(in crate::db) fn project_planner_route_profile_for_model(
430    model: &EntityModel,
431    plan: &AccessPlannedQuery,
432) -> PlannerRouteProfile {
433    let secondary_order_contract = plan
434        .scalar_plan()
435        .order
436        .as_ref()
437        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
438
439    PlannerRouteProfile::new(
440        derive_continuation_policy_validated(plan),
441        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
442        secondary_order_contract,
443    )
444}
445
446fn project_static_planning_shape_for_model(
447    model: &EntityModel,
448    plan: &AccessPlannedQuery,
449) -> Result<StaticPlanningShape, InternalError> {
450    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
451    let execution_preparation_predicate = plan.execution_preparation_predicate();
452    let residual_filter_predicate = derive_residual_filter_predicate(plan);
453    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
454    let execution_preparation_compiled_predicate =
455        compile_optional_predicate(model, execution_preparation_predicate.as_ref());
456    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
457        model,
458        residual_filter_expr.as_ref(),
459        residual_filter_predicate.as_ref(),
460    )?;
461    let scalar_projection_plan =
462        if plan.grouped_plan().is_none() {
463            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
464            InternalError::query_executor_invariant(
465                "scalar projection program must compile during static planning finalization",
466            )
467        })?)
468        } else {
469            None
470        };
471    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
472        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
473    let projection_direct_slots =
474        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
475    let projection_referenced_slots = projection_spec.referenced_slots_for(model)?;
476    let projected_slot_mask =
477        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
478    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
479    let resolved_order = resolved_order_for_plan(model, plan)?;
480    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
481    let slot_map = slot_map_for_model_plan(model, plan);
482    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
483
484    Ok(StaticPlanningShape {
485        primary_key_name: model.primary_key.name,
486        projection_spec,
487        execution_preparation_predicate,
488        residual_filter_expr,
489        residual_filter_predicate,
490        execution_preparation_compiled_predicate,
491        effective_runtime_filter_program,
492        scalar_projection_plan,
493        grouped_aggregate_execution_specs,
494        grouped_distinct_execution_strategy,
495        projection_direct_slots,
496        projection_referenced_slots,
497        projected_slot_mask,
498        projection_is_model_identity,
499        resolved_order,
500        order_referenced_slots,
501        slot_map,
502        index_compile_targets,
503    })
504}
505
506// Compile the executor-owned residual scalar filter contract once from the
507// planner-derived residual artifacts so runtime never has to rediscover
508// residual presence or shape from semantic/filter/pushdown state.
509fn compile_effective_runtime_filter_program(
510    model: &EntityModel,
511    residual_filter_expr: Option<&Expr>,
512    residual_filter_predicate: Option<&Predicate>,
513) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
514    // Keep the existing predicate fast path when the residual semantics still
515    // fit the derived predicate contract. The expression-owned lane is only
516    // needed once pushdown loses semantic coverage and a residual predicate no
517    // longer exists.
518    if let Some(predicate) = residual_filter_predicate {
519        return Ok(Some(EffectiveRuntimeFilterProgram::Predicate(
520            PredicateProgram::compile(model, predicate),
521        )));
522    }
523
524    if let Some(filter_expr) = residual_filter_expr {
525        let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
526            InternalError::query_invalid_logical_plan(
527                "effective runtime scalar filter expression must compile during static planning finalization",
528            )
529        })?;
530
531        return Ok(Some(EffectiveRuntimeFilterProgram::Expr(compiled)));
532    }
533
534    Ok(None)
535}
536
537// Derive the executor-preparation predicate once from the selected access path.
538// This strips only filtered-index guard clauses while preserving access-bound
539// equalities that still matter to preparation/explain consumers.
540fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
541    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
542
543    match plan.access.selected_index_model() {
544        Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
545        None => Some(query_predicate.clone()),
546    }
547}
548
549// Derive the final residual predicate once from the already-filtered
550// preparation predicate plus any equality bounds guaranteed by the concrete
551// access path.
552fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
553    let filtered_residual = derive_execution_preparation_predicate(plan);
554    let filtered_residual = filtered_residual.as_ref()?;
555
556    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
557}
558
559// Derive the explicit residual semantic expression once for finalized plans.
560// The residual expression remains the planner-owned semantic filter when any
561// runtime filtering still survives access satisfaction.
562fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
563    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
564    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
565        return None;
566    }
567
568    Some(filter_expr.clone())
569}
570
571// Derive the explicit residual semantic expression during finalization using
572// the trusted entity schema so compare-family literal normalization matches the
573// planner-owned predicate contract before residual ownership is decided.
574fn derive_residual_filter_expr_for_model(
575    model: &EntityModel,
576    plan: &AccessPlannedQuery,
577) -> Option<Expr> {
578    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
579    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
580        return None;
581    }
582
583    Some(filter_expr.clone())
584}
585
586// Return whether any residual filtering survives after access planning. This
587// helper exists only for pre-finalization assembly; finalized plans must read
588// the explicit residual artifacts frozen in `StaticPlanningShape`.
589fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
590    match (
591        plan.scalar_plan().filter_expr.as_ref(),
592        plan.scalar_plan().predicate.as_ref(),
593    ) {
594        (None, None) => false,
595        (Some(_), None) => true,
596        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
597    }
598}
599
600// Return true when the planner-owned predicate contract is fully satisfied by
601// access planning and no semantic residual filter expression survives.
602fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
603    plan.scalar_plan().predicate.is_some()
604        && derive_residual_filter_predicate(plan).is_none()
605        && derive_residual_filter_expr(plan).is_none()
606}
607
608// Return true when the semantic filter expression is entirely represented by
609// the planner-owned predicate contract and the chosen access path satisfies
610// that predicate without any runtime remainder.
611const fn derive_semantic_filter_fully_satisfied_by_access_contract(
612    plan: &AccessPlannedQuery,
613) -> bool {
614    plan.scalar_plan().filter_expr.is_some()
615        && plan.scalar_plan().predicate.is_some()
616        && plan.scalar_plan().predicate_covers_filter_expr
617}
618
619// Return true when finalized planning can prove that the semantic filter
620// expression is completely represented by the planner-owned predicate contract
621// after aligning compare literals through the trusted entity schema.
622const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
623    _model: &EntityModel,
624    plan: &AccessPlannedQuery,
625) -> bool {
626    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
627}
628
629// Compile one optional planner-frozen predicate program while keeping the
630// static planning assembly path free of repeated `Option` mapping boilerplate.
631fn compile_optional_predicate(
632    model: &EntityModel,
633    predicate: Option<&Predicate>,
634) -> Option<PredicateProgram> {
635    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
636}
637
638// Resolve the grouped-only static planning semantics bundle once so grouped
639// aggregate execution specs and grouped DISTINCT strategy stay derived under
640// one shared grouped-plan branch.
641fn resolve_grouped_static_planning_semantics(
642    model: &EntityModel,
643    plan: &AccessPlannedQuery,
644    projection_spec: &ProjectionSpec,
645) -> Result<
646    (
647        Option<Vec<GroupedAggregateExecutionSpec>>,
648        Option<GroupedDistinctExecutionStrategy>,
649    ),
650    InternalError,
651> {
652    let Some(grouped) = plan.grouped_plan() else {
653        return Ok((None, None));
654    };
655
656    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
657        projection_spec,
658        grouped.group.group_fields.as_slice(),
659        grouped.group.aggregates.as_slice(),
660    )?;
661    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
662
663    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
664        model,
665        aggregate_specs.as_slice(),
666    )?);
667    let grouped_distinct_execution_strategy =
668        Some(resolved_grouped_distinct_execution_strategy_for_model(
669            model,
670            grouped.group.group_fields.as_slice(),
671            grouped.group.aggregates.as_slice(),
672            grouped.having_expr.as_ref(),
673        )?);
674
675    Ok((
676        grouped_aggregate_execution_specs,
677        grouped_distinct_execution_strategy,
678    ))
679}
680
681fn extend_grouped_having_aggregate_specs(
682    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
683    grouped: &GroupPlan,
684) -> Result<(), InternalError> {
685    if let Some(having_expr) = grouped.having_expr.as_ref() {
686        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
687    }
688
689    Ok(())
690}
691
692fn collect_grouped_having_expr_aggregate_specs(
693    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
694    expr: &Expr,
695) -> Result<(), InternalError> {
696    if !expr.contains_aggregate() {
697        return Ok(());
698    }
699
700    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
701        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
702
703        if aggregate_specs
704            .iter()
705            .all(|current| current != &aggregate_spec)
706        {
707            aggregate_specs.push(aggregate_spec);
708        }
709
710        Ok(())
711    })
712}
713
714fn projected_slot_mask_for_spec(
715    model: &EntityModel,
716    direct_projection_slots: Option<&[usize]>,
717) -> Vec<bool> {
718    let mut projected_slots = vec![false; model.fields().len()];
719
720    let Some(direct_projection_slots) = direct_projection_slots else {
721        return projected_slots;
722    };
723
724    for slot in direct_projection_slots.iter().copied() {
725        if let Some(projected) = projected_slots.get_mut(slot) {
726            *projected = true;
727        }
728    }
729
730    projected_slots
731}
732
733fn resolved_order_for_plan(
734    model: &EntityModel,
735    plan: &AccessPlannedQuery,
736) -> Result<Option<ResolvedOrder>, InternalError> {
737    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
738        return Ok(None);
739    }
740
741    let Some(order) = plan.scalar_plan().order.as_ref() else {
742        return Ok(None);
743    };
744
745    let mut fields = Vec::with_capacity(order.fields.len());
746    for term in &order.fields {
747        fields.push(ResolvedOrderField::new(
748            resolved_order_value_source_for_term(model, term)?,
749            term.direction(),
750        ));
751    }
752
753    Ok(Some(ResolvedOrder::new(fields)))
754}
755
756fn resolved_order_value_source_for_term(
757    model: &EntityModel,
758    term: &crate::db::query::plan::OrderTerm,
759) -> Result<ResolvedOrderValueSource, InternalError> {
760    if term.direct_field().is_none() {
761        let rendered = term.rendered_label();
762        validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
763        let compiled = compile_scalar_projection_expr(model, term.expr())
764            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
765
766        return Ok(ResolvedOrderValueSource::expression(compiled));
767    }
768
769    let field = term
770        .direct_field()
771        .expect("direct-field order branch should only execute for field-backed terms");
772    let slot = resolve_required_field_slot(model, field, || {
773        InternalError::query_invalid_logical_plan(format!(
774            "order expression references unknown field '{field}'",
775        ))
776    })?;
777
778    Ok(ResolvedOrderValueSource::direct_field(slot))
779}
780
781fn validate_resolved_order_expr_fields(
782    model: &EntityModel,
783    expr: &Expr,
784    rendered: &str,
785) -> Result<(), InternalError> {
786    expr.try_for_each_tree_expr(&mut |node| match node {
787        Expr::Field(field_id) => resolve_required_field_slot(model, field_id.as_str(), || {
788            InternalError::query_invalid_logical_plan(format!(
789                "order expression references unknown field '{rendered}'",
790            ))
791        })
792        .map(|_| ()),
793        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
794        #[cfg(test)]
795        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
796        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
797        _ => Ok(()),
798    })
799}
800
801// Resolve one model field slot while keeping planner invalid-logical-plan
802// error construction at the callsite that owns the diagnostic wording.
803fn resolve_required_field_slot<F>(
804    model: &EntityModel,
805    field: &str,
806    invalid_plan_error: F,
807) -> Result<usize, InternalError>
808where
809    F: FnOnce() -> InternalError,
810{
811    model
812        .resolve_field_slot(field)
813        .ok_or_else(invalid_plan_error)
814}
815
816// Keep the scalar-order expression seam violation text under one helper so the
817// parse validation and compile validation paths do not drift.
818fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
819    InternalError::query_invalid_logical_plan(format!(
820        "order expression '{rendered}' did not stay on the scalar expression seam",
821    ))
822}
823
824// Keep one stable executor-facing slot list for grouped order terms after the
825// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
826// now consumes this same referenced-slot contract instead of re-deriving order
827// sources from planner strategy at runtime.
828fn order_referenced_slots_for_resolved_order(
829    resolved_order: Option<&ResolvedOrder>,
830) -> Option<Vec<usize>> {
831    Some(resolved_order?.referenced_slots())
832}
833
834fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
835    let executable = plan.access.executable_contract();
836
837    resolved_index_slots_for_access_path(model, &executable)
838}
839
840fn resolved_index_slots_for_access_path(
841    model: &EntityModel,
842    access: &ExecutableAccessPlan<'_, crate::value::Value>,
843) -> Option<Vec<usize>> {
844    let path = access.as_path()?;
845    let path_capabilities = path.capabilities();
846    let index_fields = path_capabilities.index_fields_for_slot_map()?;
847    let mut slots = Vec::with_capacity(index_fields.len());
848
849    for field_name in index_fields {
850        let slot = model.resolve_field_slot(field_name)?;
851        slots.push(slot);
852    }
853
854    Some(slots)
855}
856
857fn index_compile_targets_for_model_plan(
858    model: &EntityModel,
859    plan: &AccessPlannedQuery,
860) -> Option<Vec<IndexCompileTarget>> {
861    let index = plan.access.as_path()?.selected_index_model()?;
862    let mut targets = Vec::new();
863
864    match index.key_items() {
865        IndexKeyItemsRef::Fields(fields) => {
866            for (component_index, &field_name) in fields.iter().enumerate() {
867                let field_slot = model.resolve_field_slot(field_name)?;
868                targets.push(IndexCompileTarget {
869                    component_index,
870                    field_slot,
871                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
872                });
873            }
874        }
875        IndexKeyItemsRef::Items(items) => {
876            for (component_index, &key_item) in items.iter().enumerate() {
877                let field_slot = model.resolve_field_slot(key_item.field())?;
878                targets.push(IndexCompileTarget {
879                    component_index,
880                    field_slot,
881                    key_item,
882                });
883            }
884        }
885    }
886
887    Some(targets)
888}