Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::{IndexCompileTarget, Predicate, PredicateProgram, normalize_enum_literals},
10        query::plan::{
11            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16            derive_logical_pushdown_eligibility,
17            expr::{
18                Expr, ProjectionSpec, ScalarProjectionExpr,
19                canonicalize_runtime_predicate_via_bool_expr, compile_scalar_projection_expr,
20                compile_scalar_projection_plan, derive_normalized_bool_expr_predicate_subset,
21                normalize_bool_expr,
22            },
23            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24            grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
25            lower_projection_identity, lower_projection_intent,
26            residual_query_predicate_after_access_path_bounds,
27            residual_query_predicate_after_filtered_access,
28            resolved_grouped_distinct_execution_strategy_for_model,
29        },
30        schema::SchemaInfo,
31    },
32    error::InternalError,
33    model::{entity::EntityModel, index::IndexKeyItemsRef},
34};
35
36impl QueryMode {
37    /// True if this mode represents a load intent.
38    #[must_use]
39    pub const fn is_load(&self) -> bool {
40        match self {
41            Self::Load(_) => true,
42            Self::Delete(_) => false,
43        }
44    }
45
46    /// True if this mode represents a delete intent.
47    #[must_use]
48    pub const fn is_delete(&self) -> bool {
49        match self {
50            Self::Delete(_) => true,
51            Self::Load(_) => false,
52        }
53    }
54}
55
56impl LogicalPlan {
57    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
58    #[must_use]
59    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
60        match self {
61            Self::Scalar(plan) => plan,
62            Self::Grouped(plan) => &plan.scalar,
63        }
64    }
65
66    /// Borrow scalar semantic fields mutably across logical variants for tests.
67    #[must_use]
68    #[cfg(test)]
69    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
70        match self {
71            Self::Scalar(plan) => plan,
72            Self::Grouped(plan) => &mut plan.scalar,
73        }
74    }
75
76    /// Test-only shorthand for explicit scalar semantic borrowing.
77    #[must_use]
78    #[cfg(test)]
79    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
80        self.scalar_semantics()
81    }
82
83    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
84    #[must_use]
85    #[cfg(test)]
86    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
87        self.scalar_semantics_mut()
88    }
89}
90
91impl AccessPlannedQuery {
92    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
93    #[must_use]
94    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
95        self.logical.scalar_semantics()
96    }
97
98    /// Borrow scalar semantic fields mutably across logical variants for tests.
99    #[must_use]
100    #[cfg(test)]
101    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
102        self.logical.scalar_semantics_mut()
103    }
104
105    /// Test-only shorthand for explicit scalar plan borrowing.
106    #[must_use]
107    #[cfg(test)]
108    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
109        self.scalar_plan()
110    }
111
112    /// Test-only shorthand for explicit mutable scalar plan borrowing.
113    #[must_use]
114    #[cfg(test)]
115    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
116        self.scalar_plan_mut()
117    }
118
119    /// Borrow grouped semantic fields when this plan is grouped.
120    #[must_use]
121    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
122        match &self.logical {
123            LogicalPlan::Scalar(_) => None,
124            LogicalPlan::Grouped(plan) => Some(plan),
125        }
126    }
127
128    /// Lower this plan into one canonical planner-owned projection semantic spec.
129    #[must_use]
130    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
131        if let Some(static_shape) = &self.static_planning_shape {
132            return static_shape.projection_spec.clone();
133        }
134
135        lower_projection_intent(model, &self.logical, &self.projection_selection)
136    }
137
138    /// Lower this plan into one projection semantic shape for identity hashing.
139    #[must_use]
140    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
141        lower_projection_identity(&self.logical, &self.projection_selection)
142    }
143
144    /// Return the executor-facing predicate after removing only filtered-index
145    /// guard clauses the chosen access path already proves.
146    ///
147    /// This conservative form is used by preparation/explain surfaces that
148    /// still need to see access-bound equalities as index-predicate input.
149    #[must_use]
150    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
151        if let Some(static_shape) = self.static_planning_shape.as_ref() {
152            return static_shape.execution_preparation_predicate.clone();
153        }
154
155        derive_execution_preparation_predicate(self)
156    }
157
158    /// Return the executor-facing residual predicate after removing any
159    /// filtered-index guard clauses and fixed access-bound equalities already
160    /// guaranteed by the chosen path.
161    #[must_use]
162    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
163        if let Some(static_shape) = self.static_planning_shape.as_ref() {
164            return static_shape.residual_filter_predicate.clone();
165        }
166
167        derive_residual_filter_predicate(self)
168    }
169
170    /// Return whether one explicit residual predicate survives access
171    /// planning and still participates in residual execution.
172    #[must_use]
173    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
174        self.effective_execution_predicate().is_some()
175    }
176
177    /// Borrow the planner-owned residual scalar filter expression when one
178    /// surviving semantic remainder still requires runtime evaluation.
179    #[must_use]
180    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
181        if let Some(static_shape) = self.static_planning_shape.as_ref() {
182            return static_shape.residual_filter_expr.as_ref();
183        }
184
185        if !derive_has_residual_filter(self) {
186            return None;
187        }
188
189        self.scalar_plan().filter_expr.as_ref()
190    }
191
192    /// Return whether one explicit residual scalar filter expression survives
193    /// access planning and still requires runtime evaluation.
194    #[must_use]
195    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
196        self.residual_filter_expr().is_some()
197    }
198
199    /// Borrow the planner-compiled execution-preparation predicate program.
200    #[must_use]
201    pub(in crate::db) const fn execution_preparation_compiled_predicate(
202        &self,
203    ) -> Option<&PredicateProgram> {
204        self.static_planning_shape()
205            .execution_preparation_compiled_predicate
206            .as_ref()
207    }
208
209    /// Borrow the planner-compiled effective runtime predicate program.
210    #[must_use]
211    pub(in crate::db) const fn effective_runtime_compiled_predicate(
212        &self,
213    ) -> Option<&PredicateProgram> {
214        match self
215            .static_planning_shape()
216            .effective_runtime_filter_program
217            .as_ref()
218        {
219            Some(EffectiveRuntimeFilterProgram::Predicate(program)) => Some(program),
220            Some(EffectiveRuntimeFilterProgram::Expr(_)) | None => None,
221        }
222    }
223
224    /// Borrow the planner-compiled effective runtime scalar filter expression.
225    #[must_use]
226    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
227        &self,
228    ) -> Option<&ScalarProjectionExpr> {
229        match self
230            .static_planning_shape()
231            .effective_runtime_filter_program
232            .as_ref()
233        {
234            Some(EffectiveRuntimeFilterProgram::Expr(expr)) => Some(expr),
235            Some(EffectiveRuntimeFilterProgram::Predicate(_)) | None => None,
236        }
237    }
238
239    /// Borrow the planner-frozen effective runtime scalar filter program.
240    #[must_use]
241    pub(in crate::db) const fn effective_runtime_filter_program(
242        &self,
243    ) -> Option<&EffectiveRuntimeFilterProgram> {
244        self.static_planning_shape()
245            .effective_runtime_filter_program
246            .as_ref()
247    }
248
249    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
250    #[must_use]
251    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
252        if !self.scalar_plan().distinct {
253            return DistinctExecutionStrategy::None;
254        }
255
256        // DISTINCT on duplicate-safe single-path access shapes is a planner
257        // no-op for runtime dedup mechanics. Composite shapes can surface
258        // duplicate keys and therefore retain explicit dedup execution.
259        match distinct_runtime_dedup_strategy(&self.access) {
260            Some(strategy) => strategy,
261            None => DistinctExecutionStrategy::None,
262        }
263    }
264
265    /// Freeze one planner-owned route profile after model validation completes.
266    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
267        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
268    }
269
270    /// Freeze planner-owned executor metadata after logical/access planning completes.
271    pub(in crate::db) fn finalize_static_planning_shape_for_model(
272        &mut self,
273        model: &EntityModel,
274    ) -> Result<(), InternalError> {
275        self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
276
277        Ok(())
278    }
279
280    /// Build one immutable execution-shape signature contract for runtime layers.
281    #[must_use]
282    pub(in crate::db) fn execution_shape_signature(
283        &self,
284        entity_path: &'static str,
285    ) -> ExecutionShapeSignature {
286        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
287    }
288
289    /// Return whether the chosen access contract fully satisfies the current
290    /// scalar query predicate without any additional post-access filtering.
291    #[must_use]
292    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
293        if let Some(static_shape) = self.static_planning_shape.as_ref() {
294            return self.scalar_plan().predicate.is_some()
295                && static_shape.residual_filter_predicate.is_none()
296                && static_shape.residual_filter_expr.is_none();
297        }
298
299        derive_predicate_fully_satisfied_by_access_contract(self)
300    }
301
302    /// Borrow the planner-frozen compiled scalar projection program.
303    #[must_use]
304    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
305        self.static_planning_shape()
306            .scalar_projection_plan
307            .as_deref()
308    }
309
310    /// Borrow the planner-frozen primary-key field name.
311    #[must_use]
312    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
313        self.static_planning_shape().primary_key_name
314    }
315
316    /// Borrow the planner-frozen projection slot reachability set.
317    #[must_use]
318    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
319        self.static_planning_shape()
320            .projection_referenced_slots
321            .as_slice()
322    }
323
324    /// Borrow the planner-frozen mask for direct projected output slots.
325    #[must_use]
326    #[cfg(any(test, feature = "diagnostics"))]
327    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
328        self.static_planning_shape().projected_slot_mask.as_slice()
329    }
330
331    /// Return whether projection remains the full model-identity field list.
332    #[must_use]
333    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
334        self.static_planning_shape().projection_is_model_identity
335    }
336
337    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
338    #[must_use]
339    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
340        self.static_planning_shape()
341            .order_referenced_slots
342            .as_deref()
343    }
344
345    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
346    #[must_use]
347    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
348        self.static_planning_shape().resolved_order.as_ref()
349    }
350
351    /// Borrow the planner-frozen access slot map used by index predicate compilation.
352    #[must_use]
353    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
354        self.static_planning_shape().slot_map.as_deref()
355    }
356
357    /// Borrow grouped aggregate execution specs already resolved during static planning.
358    #[must_use]
359    pub(in crate::db) fn grouped_aggregate_execution_specs(
360        &self,
361    ) -> Option<&[GroupedAggregateExecutionSpec]> {
362        self.static_planning_shape()
363            .grouped_aggregate_execution_specs
364            .as_deref()
365    }
366
367    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
368    #[must_use]
369    pub(in crate::db) const fn grouped_distinct_execution_strategy(
370        &self,
371    ) -> Option<&GroupedDistinctExecutionStrategy> {
372        self.static_planning_shape()
373            .grouped_distinct_execution_strategy
374            .as_ref()
375    }
376
377    /// Borrow the frozen projection semantic shape without reopening model ownership.
378    #[must_use]
379    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
380        &self.static_planning_shape().projection_spec
381    }
382
383    /// Borrow the frozen direct projection slots without reopening model ownership.
384    #[must_use]
385    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
386        self.static_planning_shape()
387            .projection_direct_slots
388            .as_deref()
389    }
390
391    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
392    #[must_use]
393    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
394        self.static_planning_shape()
395            .index_compile_targets
396            .as_deref()
397    }
398
399    const fn static_planning_shape(&self) -> &StaticPlanningShape {
400        self.static_planning_shape
401            .as_ref()
402            .expect("access-planned queries must freeze static planning shape before execution")
403    }
404}
405
406fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
407    match access {
408        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
409            Some(DistinctExecutionStrategy::PreOrdered)
410        }
411        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
412            Some(DistinctExecutionStrategy::HashMaterialize)
413        }
414        AccessPlan::Path(_) => None,
415    }
416}
417
418fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
419    let is_grouped_safe = plan
420        .grouped_plan()
421        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
422
423    ContinuationPolicy::new(
424        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
425        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
426        is_grouped_safe,
427    )
428}
429
430/// Project one planner-owned route profile from the finalized logical+access plan.
431#[must_use]
432pub(in crate::db) fn project_planner_route_profile_for_model(
433    model: &EntityModel,
434    plan: &AccessPlannedQuery,
435) -> PlannerRouteProfile {
436    let secondary_order_contract = plan
437        .scalar_plan()
438        .order
439        .as_ref()
440        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
441
442    PlannerRouteProfile::new(
443        derive_continuation_policy_validated(plan),
444        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
445        secondary_order_contract,
446    )
447}
448
449fn project_static_planning_shape_for_model(
450    model: &EntityModel,
451    plan: &AccessPlannedQuery,
452) -> Result<StaticPlanningShape, InternalError> {
453    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
454    let execution_preparation_predicate = plan.execution_preparation_predicate();
455    let residual_filter_predicate = derive_residual_filter_predicate(plan);
456    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
457    let execution_preparation_compiled_predicate =
458        compile_optional_predicate(model, execution_preparation_predicate.as_ref());
459    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
460        model,
461        residual_filter_expr.as_ref(),
462        residual_filter_predicate.as_ref(),
463    )?;
464    let scalar_projection_plan =
465        if plan.grouped_plan().is_none() {
466            Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
467            InternalError::query_executor_invariant(
468                "scalar projection program must compile during static planning finalization",
469            )
470        })?)
471        } else {
472            None
473        };
474    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
475        resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
476    let projection_direct_slots =
477        lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
478    let projection_referenced_slots = projection_spec.referenced_slots_for(model)?;
479    let projected_slot_mask =
480        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
481    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
482    let resolved_order = resolved_order_for_plan(model, plan)?;
483    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
484    let slot_map = slot_map_for_model_plan(model, plan);
485    let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
486
487    Ok(StaticPlanningShape {
488        primary_key_name: model.primary_key.name,
489        projection_spec,
490        execution_preparation_predicate,
491        residual_filter_expr,
492        residual_filter_predicate,
493        execution_preparation_compiled_predicate,
494        effective_runtime_filter_program,
495        scalar_projection_plan,
496        grouped_aggregate_execution_specs,
497        grouped_distinct_execution_strategy,
498        projection_direct_slots,
499        projection_referenced_slots,
500        projected_slot_mask,
501        projection_is_model_identity,
502        resolved_order,
503        order_referenced_slots,
504        slot_map,
505        index_compile_targets,
506    })
507}
508
509// Compile the executor-owned residual scalar filter contract once from the
510// planner-derived residual artifacts so runtime never has to rediscover
511// residual presence or shape from semantic/filter/pushdown state.
512fn compile_effective_runtime_filter_program(
513    model: &EntityModel,
514    residual_filter_expr: Option<&Expr>,
515    residual_filter_predicate: Option<&Predicate>,
516) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
517    // Keep the existing predicate fast path when the residual semantics still
518    // fit the derived predicate contract. The expression-owned lane is only
519    // needed once pushdown loses semantic coverage and a residual predicate no
520    // longer exists.
521    if let Some(predicate) = residual_filter_predicate {
522        return Ok(Some(EffectiveRuntimeFilterProgram::Predicate(
523            PredicateProgram::compile(model, predicate),
524        )));
525    }
526
527    if let Some(filter_expr) = residual_filter_expr {
528        let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
529            InternalError::query_invalid_logical_plan(
530                "effective runtime scalar filter expression must compile during static planning finalization",
531            )
532        })?;
533
534        return Ok(Some(EffectiveRuntimeFilterProgram::Expr(compiled)));
535    }
536
537    Ok(None)
538}
539
540// Derive the executor-preparation predicate once from the selected access path.
541// This strips only filtered-index guard clauses while preserving access-bound
542// equalities that still matter to preparation/explain consumers.
543fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
544    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
545
546    match plan.access.selected_index_model() {
547        Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
548        None => Some(query_predicate.clone()),
549    }
550}
551
552// Derive the final residual predicate once from the already-filtered
553// preparation predicate plus any equality bounds guaranteed by the concrete
554// access path.
555fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
556    let filtered_residual = derive_execution_preparation_predicate(plan);
557    let filtered_residual = filtered_residual.as_ref()?;
558
559    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
560}
561
562// Derive the explicit residual semantic expression once for finalized plans.
563// The residual expression remains the planner-owned semantic filter when any
564// runtime filtering still survives access satisfaction.
565fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
566    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
567    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
568        return None;
569    }
570
571    Some(filter_expr.clone())
572}
573
574// Derive the explicit residual semantic expression during finalization using
575// the trusted entity schema so compare-family literal normalization matches the
576// planner-owned predicate contract before residual ownership is decided.
577fn derive_residual_filter_expr_for_model(
578    model: &EntityModel,
579    plan: &AccessPlannedQuery,
580) -> Option<Expr> {
581    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
582    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
583        return None;
584    }
585
586    Some(filter_expr.clone())
587}
588
589// Return whether any residual filtering survives after access planning. This
590// helper exists only for pre-finalization assembly; finalized plans must read
591// the explicit residual artifacts frozen in `StaticPlanningShape`.
592fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
593    match (
594        plan.scalar_plan().filter_expr.as_ref(),
595        plan.scalar_plan().predicate.as_ref(),
596    ) {
597        (None, None) => false,
598        (Some(_), None) => true,
599        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
600    }
601}
602
603// Return true when the planner-owned predicate contract is fully satisfied by
604// access planning and no semantic residual filter expression survives.
605fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
606    plan.scalar_plan().predicate.is_some()
607        && derive_residual_filter_predicate(plan).is_none()
608        && derive_residual_filter_expr(plan).is_none()
609}
610
611// Return true when the semantic filter expression is entirely represented by
612// the planner-owned predicate contract and the chosen access path satisfies
613// that predicate without any runtime remainder.
614fn derive_semantic_filter_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
615    let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
616        return false;
617    };
618    let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
619    let Some(filter_predicate) =
620        derive_normalized_bool_expr_predicate_subset(&normalized_filter_expr)
621    else {
622        return false;
623    };
624    let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
625        return false;
626    };
627
628    canonicalize_runtime_predicate_via_bool_expr(filter_predicate)
629        == canonicalize_runtime_predicate_via_bool_expr(query_predicate.clone())
630}
631
632// Return true when finalized planning can prove that the semantic filter
633// expression is completely represented by the planner-owned predicate contract
634// after aligning compare literals through the trusted entity schema.
635fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
636    model: &EntityModel,
637    plan: &AccessPlannedQuery,
638) -> bool {
639    let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
640        return false;
641    };
642    let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
643    let Some(filter_predicate) =
644        derive_normalized_bool_expr_predicate_subset(&normalized_filter_expr)
645    else {
646        return false;
647    };
648    let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
649        return false;
650    };
651    let schema = SchemaInfo::cached_for_entity_model(model);
652    let Ok(filter_predicate) = normalize_enum_literals(schema, &filter_predicate) else {
653        return false;
654    };
655    let Ok(query_predicate) = normalize_enum_literals(schema, query_predicate) else {
656        return false;
657    };
658
659    canonicalize_runtime_predicate_via_bool_expr(filter_predicate)
660        == canonicalize_runtime_predicate_via_bool_expr(query_predicate)
661}
662
663// Compile one optional planner-frozen predicate program while keeping the
664// static planning assembly path free of repeated `Option` mapping boilerplate.
665fn compile_optional_predicate(
666    model: &EntityModel,
667    predicate: Option<&Predicate>,
668) -> Option<PredicateProgram> {
669    predicate.map(|predicate| PredicateProgram::compile(model, predicate))
670}
671
672// Resolve the grouped-only static planning semantics bundle once so grouped
673// aggregate execution specs and grouped DISTINCT strategy stay derived under
674// one shared grouped-plan branch.
675fn resolve_grouped_static_planning_semantics(
676    model: &EntityModel,
677    plan: &AccessPlannedQuery,
678    projection_spec: &ProjectionSpec,
679) -> Result<
680    (
681        Option<Vec<GroupedAggregateExecutionSpec>>,
682        Option<GroupedDistinctExecutionStrategy>,
683    ),
684    InternalError,
685> {
686    let Some(grouped) = plan.grouped_plan() else {
687        return Ok((None, None));
688    };
689
690    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
691        projection_spec,
692        grouped.group.group_fields.as_slice(),
693        grouped.group.aggregates.as_slice(),
694    )?;
695    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
696
697    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
698        model,
699        aggregate_specs.as_slice(),
700    )?);
701    let grouped_distinct_execution_strategy =
702        Some(resolved_grouped_distinct_execution_strategy_for_model(
703            model,
704            grouped.group.group_fields.as_slice(),
705            grouped.group.aggregates.as_slice(),
706            grouped.having_expr.as_ref(),
707        )?);
708
709    Ok((
710        grouped_aggregate_execution_specs,
711        grouped_distinct_execution_strategy,
712    ))
713}
714
715fn extend_grouped_having_aggregate_specs(
716    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
717    grouped: &GroupPlan,
718) -> Result<(), InternalError> {
719    if let Some(having_expr) = grouped.having_expr.as_ref() {
720        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
721    }
722
723    Ok(())
724}
725
726fn collect_grouped_having_expr_aggregate_specs(
727    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
728    expr: &Expr,
729) -> Result<(), InternalError> {
730    match expr {
731        Expr::Aggregate(aggregate_expr) => {
732            let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
733
734            if aggregate_specs
735                .iter()
736                .all(|current| current != &aggregate_spec)
737            {
738                aggregate_specs.push(aggregate_spec);
739            }
740        }
741        Expr::Field(_) | Expr::Literal(_) => {}
742        Expr::FunctionCall { args, .. } => {
743            for arg in args {
744                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arg)?;
745            }
746        }
747        Expr::Unary { expr, .. } => {
748            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
749        }
750        Expr::Case {
751            when_then_arms,
752            else_expr,
753        } => {
754            for arm in when_then_arms {
755                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.condition())?;
756                collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.result())?;
757            }
758
759            collect_grouped_having_expr_aggregate_specs(aggregate_specs, else_expr)?;
760        }
761        Expr::Binary { left, right, .. } => {
762            collect_grouped_having_expr_aggregate_specs(aggregate_specs, left)?;
763            collect_grouped_having_expr_aggregate_specs(aggregate_specs, right)?;
764        }
765        #[cfg(test)]
766        Expr::Alias { expr, .. } => {
767            collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
768        }
769    }
770
771    Ok(())
772}
773
774fn projected_slot_mask_for_spec(
775    model: &EntityModel,
776    direct_projection_slots: Option<&[usize]>,
777) -> Vec<bool> {
778    let mut projected_slots = vec![false; model.fields().len()];
779
780    let Some(direct_projection_slots) = direct_projection_slots else {
781        return projected_slots;
782    };
783
784    for slot in direct_projection_slots.iter().copied() {
785        if let Some(projected) = projected_slots.get_mut(slot) {
786            *projected = true;
787        }
788    }
789
790    projected_slots
791}
792
793fn resolved_order_for_plan(
794    model: &EntityModel,
795    plan: &AccessPlannedQuery,
796) -> Result<Option<ResolvedOrder>, InternalError> {
797    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
798        return Ok(None);
799    }
800
801    let Some(order) = plan.scalar_plan().order.as_ref() else {
802        return Ok(None);
803    };
804
805    let mut fields = Vec::with_capacity(order.fields.len());
806    for term in &order.fields {
807        fields.push(ResolvedOrderField::new(
808            resolved_order_value_source_for_term(model, term)?,
809            term.direction(),
810        ));
811    }
812
813    Ok(Some(ResolvedOrder::new(fields)))
814}
815
816fn resolved_order_value_source_for_term(
817    model: &EntityModel,
818    term: &crate::db::query::plan::OrderTerm,
819) -> Result<ResolvedOrderValueSource, InternalError> {
820    if term.direct_field().is_none() {
821        let rendered = term.rendered_label();
822        validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
823        let compiled = compile_scalar_projection_expr(model, term.expr())
824            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
825
826        return Ok(ResolvedOrderValueSource::expression(compiled));
827    }
828
829    let field = term
830        .direct_field()
831        .expect("direct-field order branch should only execute for field-backed terms");
832    let slot = resolve_required_field_slot(model, field, || {
833        InternalError::query_invalid_logical_plan(format!(
834            "order expression references unknown field '{field}'",
835        ))
836    })?;
837
838    Ok(ResolvedOrderValueSource::direct_field(slot))
839}
840
841fn validate_resolved_order_expr_fields(
842    model: &EntityModel,
843    expr: &Expr,
844    rendered: &str,
845) -> Result<(), InternalError> {
846    match expr {
847        Expr::Field(field_id) => {
848            resolve_required_field_slot(model, field_id.as_str(), || {
849                InternalError::query_invalid_logical_plan(format!(
850                    "order expression references unknown field '{rendered}'",
851                ))
852            })?;
853        }
854        Expr::Literal(_) => {}
855        Expr::FunctionCall { args, .. } => {
856            for arg in args {
857                validate_resolved_order_expr_fields(model, arg, rendered)?;
858            }
859        }
860        Expr::Case {
861            when_then_arms,
862            else_expr,
863        } => {
864            for arm in when_then_arms {
865                validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
866                validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
867            }
868            validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
869        }
870        Expr::Binary { left, right, .. } => {
871            validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
872            validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
873        }
874        Expr::Aggregate(_) => {
875            return Err(order_expression_scalar_seam_error(rendered));
876        }
877        #[cfg(test)]
878        Expr::Alias { .. } => {
879            return Err(order_expression_scalar_seam_error(rendered));
880        }
881        Expr::Unary { .. } => {
882            return Err(order_expression_scalar_seam_error(rendered));
883        }
884    }
885
886    Ok(())
887}
888
889// Resolve one model field slot while keeping planner invalid-logical-plan
890// error construction at the callsite that owns the diagnostic wording.
891fn resolve_required_field_slot<F>(
892    model: &EntityModel,
893    field: &str,
894    invalid_plan_error: F,
895) -> Result<usize, InternalError>
896where
897    F: FnOnce() -> InternalError,
898{
899    model
900        .resolve_field_slot(field)
901        .ok_or_else(invalid_plan_error)
902}
903
904// Keep the scalar-order expression seam violation text under one helper so the
905// parse validation and compile validation paths do not drift.
906fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
907    InternalError::query_invalid_logical_plan(format!(
908        "order expression '{rendered}' did not stay on the scalar expression seam",
909    ))
910}
911
912// Keep one stable executor-facing slot list for grouped order terms after the
913// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
914// now consumes this same referenced-slot contract instead of re-deriving order
915// sources from planner strategy at runtime.
916fn order_referenced_slots_for_resolved_order(
917    resolved_order: Option<&ResolvedOrder>,
918) -> Option<Vec<usize>> {
919    let resolved_order = resolved_order?;
920    let mut referenced = Vec::new();
921
922    // Keep one stable slot list without re-parsing order expressions after the
923    // planner has already frozen structural ORDER BY sources.
924    for field in resolved_order.fields() {
925        field.source().extend_referenced_slots(&mut referenced);
926    }
927
928    Some(referenced)
929}
930
931fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
932    let access_strategy = plan.access.resolve_strategy();
933    let executable = access_strategy.executable();
934
935    resolved_index_slots_for_access_path(model, executable)
936}
937
938fn resolved_index_slots_for_access_path(
939    model: &EntityModel,
940    access: &ExecutableAccessPlan<'_, crate::value::Value>,
941) -> Option<Vec<usize>> {
942    let path = access.as_path()?;
943    let path_capabilities = path.capabilities();
944    let index_fields = path_capabilities.index_fields_for_slot_map()?;
945    let mut slots = Vec::with_capacity(index_fields.len());
946
947    for field_name in index_fields {
948        let slot = model.resolve_field_slot(field_name)?;
949        slots.push(slot);
950    }
951
952    Some(slots)
953}
954
955fn index_compile_targets_for_model_plan(
956    model: &EntityModel,
957    plan: &AccessPlannedQuery,
958) -> Option<Vec<IndexCompileTarget>> {
959    let index = plan.access.as_path()?.selected_index_model()?;
960    let mut targets = Vec::new();
961
962    match index.key_items() {
963        IndexKeyItemsRef::Fields(fields) => {
964            for (component_index, &field_name) in fields.iter().enumerate() {
965                let field_slot = model.resolve_field_slot(field_name)?;
966                targets.push(IndexCompileTarget {
967                    component_index,
968                    field_slot,
969                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
970                });
971            }
972        }
973        IndexKeyItemsRef::Items(items) => {
974            for (component_index, &key_item) in items.iter().enumerate() {
975                let field_slot = model.resolve_field_slot(key_item.field())?;
976                targets.push(IndexCompileTarget {
977                    component_index,
978                    field_slot,
979                    key_item,
980                });
981            }
982        }
983    }
984
985    Some(targets)
986}