Skip to main content

icydb_core/db/query/plan/semantics/
logical.rs

1//! Module: query::plan::semantics::logical
2//! Responsibility: logical-plan semantic lowering from planner contracts to access-planned queries.
3//! Does not own: access-path index selection internals or runtime execution behavior.
4//! Boundary: derives planner-owned execution semantics, shape signatures, and continuation policy.
5
6use crate::{
7    db::{
8        access::{AccessPlan, ExecutableAccessPlan},
9        predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10        query::plan::{
11            AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12            EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13            GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14            LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15            ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16            derive_logical_pushdown_eligibility,
17            expr::{
18                CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
19                compile_scalar_projection_plan_with_schema,
20            },
21            grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22            grouped_cursor_policy_violation, grouped_plan_strategy,
23            lower_data_row_direct_projection_slots_with_schema,
24            lower_direct_projection_slots_with_schema, lower_projection_identity,
25            lower_projection_intent, residual_query_predicate_after_access_path_bounds,
26            residual_query_predicate_after_filtered_access_contract,
27            resolved_grouped_distinct_execution_strategy_with_schema_info,
28        },
29        schema::SchemaInfo,
30    },
31    error::InternalError,
32    model::{entity::EntityModel, index::IndexKeyItemsRef},
33};
34
35impl QueryMode {
36    /// True if this mode represents a load intent.
37    #[must_use]
38    pub const fn is_load(&self) -> bool {
39        match self {
40            Self::Load(_) => true,
41            Self::Delete(_) => false,
42        }
43    }
44
45    /// True if this mode represents a delete intent.
46    #[must_use]
47    pub const fn is_delete(&self) -> bool {
48        match self {
49            Self::Delete(_) => true,
50            Self::Load(_) => false,
51        }
52    }
53}
54
55impl LogicalPlan {
56    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
57    #[must_use]
58    pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
59        match self {
60            Self::Scalar(plan) => plan,
61            Self::Grouped(plan) => &plan.scalar,
62        }
63    }
64
65    /// Borrow scalar semantic fields mutably across logical variants for tests.
66    #[must_use]
67    #[cfg(test)]
68    pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
69        match self {
70            Self::Scalar(plan) => plan,
71            Self::Grouped(plan) => &mut plan.scalar,
72        }
73    }
74
75    /// Test-only shorthand for explicit scalar semantic borrowing.
76    #[must_use]
77    #[cfg(test)]
78    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
79        self.scalar_semantics()
80    }
81
82    /// Test-only shorthand for explicit mutable scalar semantic borrowing.
83    #[must_use]
84    #[cfg(test)]
85    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
86        self.scalar_semantics_mut()
87    }
88}
89
90impl AccessPlannedQuery {
91    /// Borrow scalar semantic fields shared by scalar/grouped logical variants.
92    #[must_use]
93    pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
94        self.logical.scalar_semantics()
95    }
96
97    /// Borrow scalar missing-row consistency without exposing the full scalar
98    /// plan to executor owners that only need row-presence policy.
99    #[must_use]
100    pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
101        self.scalar_plan().consistency
102    }
103
104    /// Borrow scalar semantic fields mutably across logical variants for tests.
105    #[must_use]
106    #[cfg(test)]
107    pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
108        self.logical.scalar_semantics_mut()
109    }
110
111    /// Test-only shorthand for explicit scalar plan borrowing.
112    #[must_use]
113    #[cfg(test)]
114    pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
115        self.scalar_plan()
116    }
117
118    /// Test-only shorthand for explicit mutable scalar plan borrowing.
119    #[must_use]
120    #[cfg(test)]
121    pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
122        self.scalar_plan_mut()
123    }
124
125    /// Borrow grouped semantic fields when this plan is grouped.
126    #[must_use]
127    pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
128        match &self.logical {
129            LogicalPlan::Scalar(_) => None,
130            LogicalPlan::Grouped(plan) => Some(plan),
131        }
132    }
133
134    /// Lower this plan into one canonical planner-owned projection semantic spec.
135    #[must_use]
136    pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
137        if let Some(static_shape) = &self.static_planning_shape {
138            return static_shape.projection_spec.clone();
139        }
140
141        lower_projection_intent(model, &self.logical, &self.projection_selection)
142    }
143
144    /// Lower this plan into one projection semantic shape for identity hashing.
145    #[must_use]
146    pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
147        lower_projection_identity(&self.logical, &self.projection_selection)
148    }
149
150    /// Return the executor-facing predicate after removing only filtered-index
151    /// guard clauses the chosen access path already proves.
152    ///
153    /// This conservative form is used by preparation/explain surfaces that
154    /// still need to see access-bound equalities as index-predicate input.
155    #[must_use]
156    pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
157        if let Some(static_shape) = self.static_planning_shape.as_ref() {
158            return static_shape.execution_preparation_predicate.clone();
159        }
160
161        derive_execution_preparation_predicate(self)
162    }
163
164    /// Return the executor-facing residual predicate after removing any
165    /// filtered-index guard clauses and fixed access-bound equalities already
166    /// guaranteed by the chosen path.
167    #[must_use]
168    pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
169        if let Some(static_shape) = self.static_planning_shape.as_ref() {
170            return static_shape.residual_filter_predicate.clone();
171        }
172
173        derive_residual_filter_predicate(self)
174    }
175
176    /// Return whether one explicit residual predicate survives access
177    /// planning and still participates in residual execution.
178    #[must_use]
179    pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
180        self.effective_execution_predicate().is_some()
181    }
182
183    /// Borrow the planner-owned residual scalar filter expression when one
184    /// surviving semantic remainder still requires runtime evaluation.
185    #[must_use]
186    pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
187        if let Some(static_shape) = self.static_planning_shape.as_ref() {
188            return static_shape.residual_filter_expr.as_ref();
189        }
190
191        if !derive_has_residual_filter(self) {
192            return None;
193        }
194
195        self.scalar_plan().filter_expr.as_ref()
196    }
197
198    /// Return whether one explicit residual scalar filter expression survives
199    /// access planning and still requires runtime evaluation.
200    #[must_use]
201    pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
202        self.residual_filter_expr().is_some()
203    }
204
205    /// Borrow the planner-compiled execution-preparation predicate program.
206    #[must_use]
207    pub(in crate::db) const fn execution_preparation_compiled_predicate(
208        &self,
209    ) -> Option<&PredicateProgram> {
210        self.static_planning_shape()
211            .execution_preparation_compiled_predicate
212            .as_ref()
213    }
214
215    /// Borrow the planner-compiled effective runtime predicate program.
216    #[must_use]
217    pub(in crate::db) const fn effective_runtime_compiled_predicate(
218        &self,
219    ) -> Option<&PredicateProgram> {
220        match self
221            .static_planning_shape()
222            .effective_runtime_filter_program
223            .as_ref()
224        {
225            Some(program) => program.predicate_program(),
226            None => None,
227        }
228    }
229
230    /// Borrow the planner-compiled effective runtime scalar filter expression.
231    #[cfg(test)]
232    #[must_use]
233    pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
234        &self,
235    ) -> Option<&CompiledExpr> {
236        match self
237            .static_planning_shape()
238            .effective_runtime_filter_program
239            .as_ref()
240        {
241            Some(program) => program.expression_filter(),
242            None => None,
243        }
244    }
245
246    /// Borrow the planner-frozen effective runtime scalar filter program.
247    #[must_use]
248    pub(in crate::db) const fn effective_runtime_filter_program(
249        &self,
250    ) -> Option<&EffectiveRuntimeFilterProgram> {
251        self.static_planning_shape()
252            .effective_runtime_filter_program
253            .as_ref()
254    }
255
256    /// Lower scalar DISTINCT semantics into one executor-facing execution strategy.
257    #[must_use]
258    pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
259        if !self.scalar_plan().distinct {
260            return DistinctExecutionStrategy::None;
261        }
262
263        // DISTINCT on duplicate-safe single-path access shapes is a planner
264        // no-op for runtime dedup mechanics. Composite shapes can surface
265        // duplicate keys and therefore retain explicit dedup execution.
266        match distinct_runtime_dedup_strategy(&self.access) {
267            Some(strategy) => strategy,
268            None => DistinctExecutionStrategy::None,
269        }
270    }
271
272    /// Freeze one planner-owned route profile after model validation completes.
273    pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
274        self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
275    }
276
277    /// Freeze model-only executor metadata after logical/access planning completes.
278    #[cfg(test)]
279    pub(in crate::db) fn finalize_static_planning_shape_for_model_only(
280        &mut self,
281        model: &EntityModel,
282    ) -> Result<(), InternalError> {
283        self.finalize_static_planning_shape_for_model_with_schema(
284            model,
285            SchemaInfo::cached_for_generated_entity_model(model),
286        )
287    }
288
289    /// Freeze planner-owned executor metadata with explicit schema authority.
290    pub(in crate::db) fn finalize_static_planning_shape_for_model_with_schema(
291        &mut self,
292        model: &EntityModel,
293        schema_info: &SchemaInfo,
294    ) -> Result<(), InternalError> {
295        self.static_planning_shape = Some(project_static_planning_shape_for_model(
296            model,
297            schema_info,
298            self,
299        )?);
300
301        Ok(())
302    }
303
304    /// Build one immutable execution-shape signature contract for runtime layers.
305    #[must_use]
306    pub(in crate::db) fn execution_shape_signature(
307        &self,
308        entity_path: &'static str,
309    ) -> ExecutionShapeSignature {
310        ExecutionShapeSignature::new(self.continuation_signature(entity_path))
311    }
312
313    /// Return whether the chosen access contract fully satisfies the current
314    /// scalar query predicate without any additional post-access filtering.
315    #[must_use]
316    pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
317        if let Some(static_shape) = self.static_planning_shape.as_ref() {
318            return self.scalar_plan().predicate.is_some()
319                && static_shape.residual_filter_predicate.is_none()
320                && static_shape.residual_filter_expr.is_none();
321        }
322
323        derive_predicate_fully_satisfied_by_access_contract(self)
324    }
325
326    /// Borrow the planner-frozen compiled scalar projection program.
327    #[must_use]
328    pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
329        self.static_planning_shape()
330            .scalar_projection_plan
331            .as_deref()
332    }
333
334    /// Return whether planner-owned static execution metadata has already been frozen.
335    #[must_use]
336    pub(in crate::db) const fn has_static_planning_shape(&self) -> bool {
337        self.static_planning_shape.is_some()
338    }
339
340    /// Borrow the planner-frozen primary-key field name.
341    #[must_use]
342    pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
343        self.static_planning_shape().primary_key_name
344    }
345
346    /// Borrow the planner-frozen projection slot reachability set.
347    #[must_use]
348    pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
349        self.static_planning_shape()
350            .projection_referenced_slots
351            .as_slice()
352    }
353
354    /// Borrow the planner-frozen mask for direct projected output slots.
355    #[must_use]
356    #[cfg(any(test, feature = "diagnostics"))]
357    pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
358        self.static_planning_shape().projected_slot_mask.as_slice()
359    }
360
361    /// Return whether projection remains the full model-identity field list.
362    #[must_use]
363    pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
364        self.static_planning_shape().projection_is_model_identity
365    }
366
367    /// Borrow the planner-frozen ORDER BY slot reachability set, if any.
368    #[must_use]
369    pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
370        self.static_planning_shape()
371            .order_referenced_slots
372            .as_deref()
373    }
374
375    /// Borrow the planner-frozen resolved ORDER BY program, if one exists.
376    #[must_use]
377    pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
378        self.static_planning_shape().resolved_order.as_ref()
379    }
380
381    /// Borrow the planner-frozen access slot map used by index predicate compilation.
382    #[must_use]
383    pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
384        self.static_planning_shape().slot_map.as_deref()
385    }
386
387    /// Borrow grouped aggregate execution specs already resolved during static planning.
388    #[must_use]
389    pub(in crate::db) fn grouped_aggregate_execution_specs(
390        &self,
391    ) -> Option<&[GroupedAggregateExecutionSpec]> {
392        self.static_planning_shape()
393            .grouped_aggregate_execution_specs
394            .as_deref()
395    }
396
397    /// Borrow the planner-resolved grouped DISTINCT execution strategy when present.
398    #[must_use]
399    pub(in crate::db) const fn grouped_distinct_execution_strategy(
400        &self,
401    ) -> Option<&GroupedDistinctExecutionStrategy> {
402        self.static_planning_shape()
403            .grouped_distinct_execution_strategy
404            .as_ref()
405    }
406
407    /// Borrow the frozen projection semantic shape without reopening model ownership.
408    #[must_use]
409    pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
410        &self.static_planning_shape().projection_spec
411    }
412
413    /// Borrow the frozen direct projection slots without reopening model ownership.
414    #[must_use]
415    pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
416        self.static_planning_shape()
417            .projection_direct_slots
418            .as_deref()
419    }
420
421    /// Borrow duplicate-preserving direct projection slots for raw data-row readers.
422    #[must_use]
423    pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
424        self.static_planning_shape()
425            .projection_data_row_direct_slots
426            .as_deref()
427    }
428
429    /// Borrow the planner-frozen key-item-aware compile targets for the chosen access path.
430    #[must_use]
431    pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
432        self.static_planning_shape()
433            .index_compile_targets
434            .as_deref()
435    }
436
437    const fn static_planning_shape(&self) -> &StaticPlanningShape {
438        self.static_planning_shape
439            .as_ref()
440            .expect("access-planned queries must freeze static planning shape before execution")
441    }
442}
443
444fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
445    match access {
446        AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
447            Some(DistinctExecutionStrategy::PreOrdered)
448        }
449        AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
450            Some(DistinctExecutionStrategy::HashMaterialize)
451        }
452        AccessPlan::Path(_) => None,
453    }
454}
455
456fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
457    let is_grouped_safe = plan
458        .grouped_plan()
459        .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
460
461    ContinuationPolicy::new(
462        true, // Continuation resume windows require anchor semantics for pushdown-safe replay.
463        true, // Continuation resumes must advance strictly to prevent replay/regression loops.
464        is_grouped_safe,
465    )
466}
467
468/// Project one planner-owned route profile from the finalized logical+access plan.
469#[must_use]
470pub(in crate::db) fn project_planner_route_profile_for_model(
471    model: &EntityModel,
472    plan: &AccessPlannedQuery,
473) -> PlannerRouteProfile {
474    let secondary_order_contract = plan
475        .scalar_plan()
476        .order
477        .as_ref()
478        .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
479
480    PlannerRouteProfile::new(
481        derive_continuation_policy_validated(plan),
482        derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
483        secondary_order_contract,
484    )
485}
486
487fn project_static_planning_shape_for_model(
488    model: &EntityModel,
489    schema_info: &SchemaInfo,
490    plan: &AccessPlannedQuery,
491) -> Result<StaticPlanningShape, InternalError> {
492    let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
493    let execution_preparation_predicate = plan.execution_preparation_predicate();
494    let residual_filter_predicate = derive_residual_filter_predicate(plan);
495    let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
496    let execution_preparation_compiled_predicate =
497        compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
498    let effective_runtime_filter_program = compile_effective_runtime_filter_program(
499        schema_info,
500        residual_filter_expr.as_ref(),
501        residual_filter_predicate.as_ref(),
502    )?;
503    let scalar_projection_plan = if plan.grouped_plan().is_none() {
504        Some(
505                compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
506                    .ok_or_else(|| {
507                        InternalError::query_executor_invariant(
508                            "scalar projection program must compile during static planning finalization",
509                        )
510                    })?
511                    .iter()
512                    .map(CompiledExpr::compile)
513                    .collect(),
514            )
515    } else {
516        None
517    };
518    let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
519        resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
520    let projection_direct_slots = lower_direct_projection_slots_with_schema(
521        model,
522        schema_info,
523        &plan.logical,
524        &plan.projection_selection,
525    );
526    let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
527        model,
528        schema_info,
529        &plan.logical,
530        &plan.projection_selection,
531    );
532    let projection_referenced_slots =
533        projection_spec.referenced_slots_for_schema(model, schema_info)?;
534    let projected_slot_mask =
535        projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
536    let projection_is_model_identity = projection_spec.is_model_identity_for(model);
537    let resolved_order = resolved_order_for_plan(schema_info, plan)?;
538    let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
539    let slot_map = slot_map_for_schema_plan(schema_info, plan);
540    let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
541
542    Ok(StaticPlanningShape {
543        primary_key_name: model.primary_key.name,
544        projection_spec,
545        execution_preparation_predicate,
546        residual_filter_expr,
547        residual_filter_predicate,
548        execution_preparation_compiled_predicate,
549        effective_runtime_filter_program,
550        scalar_projection_plan,
551        grouped_aggregate_execution_specs,
552        grouped_distinct_execution_strategy,
553        projection_direct_slots,
554        projection_data_row_direct_slots,
555        projection_referenced_slots,
556        projected_slot_mask,
557        projection_is_model_identity,
558        resolved_order,
559        order_referenced_slots,
560        slot_map,
561        index_compile_targets,
562    })
563}
564
565// Compile the executor-owned residual scalar filter contract once from the
566// planner-derived residual artifacts so runtime never has to rediscover
567// residual presence or shape from semantic/filter/pushdown state.
568fn compile_effective_runtime_filter_program(
569    schema_info: &SchemaInfo,
570    residual_filter_expr: Option<&Expr>,
571    residual_filter_predicate: Option<&Predicate>,
572) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
573    // Keep the existing predicate fast path when the residual semantics still
574    // fit the derived predicate contract. The expression-owned lane is only
575    // needed once pushdown loses semantic coverage and a residual predicate no
576    // longer exists.
577    if let Some(predicate) = residual_filter_predicate {
578        return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
579            PredicateProgram::compile_with_schema_info(schema_info, predicate),
580        )));
581    }
582
583    if let Some(filter_expr) = residual_filter_expr {
584        let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
585            .ok_or_else(|| {
586                InternalError::query_invalid_logical_plan(
587                    "effective runtime scalar filter expression must compile during static planning finalization",
588                )
589            })?;
590
591        return Ok(Some(EffectiveRuntimeFilterProgram::expression(
592            CompiledExpr::compile(&compiled),
593        )));
594    }
595
596    Ok(None)
597}
598
599// Derive the executor-preparation predicate once from the selected access path.
600// This strips only filtered-index guard clauses while preserving access-bound
601// equalities that still matter to preparation/explain consumers.
602fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
603    let query_predicate = plan.scalar_plan().predicate.as_ref()?;
604
605    match plan.access.selected_index_contract() {
606        Some(index) => {
607            residual_query_predicate_after_filtered_access_contract(index, query_predicate)
608        }
609        None => Some(query_predicate.clone()),
610    }
611}
612
613// Derive the final residual predicate once from the already-filtered
614// preparation predicate plus any equality bounds guaranteed by the concrete
615// access path.
616fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
617    let filtered_residual = derive_execution_preparation_predicate(plan);
618    let filtered_residual = filtered_residual.as_ref()?;
619
620    residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
621}
622
623// Derive the explicit residual semantic expression once for finalized plans.
624// The residual expression remains the planner-owned semantic filter when any
625// runtime filtering still survives access satisfaction.
626fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
627    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
628    if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
629        return None;
630    }
631
632    Some(filter_expr.clone())
633}
634
635// Derive the explicit residual semantic expression during finalization using
636// the trusted entity schema so compare-family literal normalization matches the
637// planner-owned predicate contract before residual ownership is decided.
638fn derive_residual_filter_expr_for_model(
639    model: &EntityModel,
640    plan: &AccessPlannedQuery,
641) -> Option<Expr> {
642    let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
643    if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
644        return None;
645    }
646
647    Some(filter_expr.clone())
648}
649
650// Return whether any residual filtering survives after access planning. This
651// helper exists only for pre-finalization assembly; finalized plans must read
652// the explicit residual artifacts frozen in `StaticPlanningShape`.
653fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
654    match (
655        plan.scalar_plan().filter_expr.as_ref(),
656        plan.scalar_plan().predicate.as_ref(),
657    ) {
658        (None, None) => false,
659        (Some(_), None) => true,
660        (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
661    }
662}
663
664// Return true when the planner-owned predicate contract is fully satisfied by
665// access planning and no semantic residual filter expression survives.
666fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
667    plan.scalar_plan().predicate.is_some()
668        && derive_residual_filter_predicate(plan).is_none()
669        && derive_residual_filter_expr(plan).is_none()
670}
671
672// Return true when the semantic filter expression is entirely represented by
673// the planner-owned predicate contract and the chosen access path satisfies
674// that predicate without any runtime remainder.
675const fn derive_semantic_filter_fully_satisfied_by_access_contract(
676    plan: &AccessPlannedQuery,
677) -> bool {
678    plan.scalar_plan().filter_expr.is_some()
679        && plan.scalar_plan().predicate.is_some()
680        && plan.scalar_plan().predicate_covers_filter_expr
681}
682
683// Return true when finalized planning can prove that the semantic filter
684// expression is completely represented by the planner-owned predicate contract
685// after aligning compare literals through the trusted entity schema.
686const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
687    _model: &EntityModel,
688    plan: &AccessPlannedQuery,
689) -> bool {
690    derive_semantic_filter_fully_satisfied_by_access_contract(plan)
691}
692
693// Compile one optional planner-frozen predicate program while keeping the
694// static planning assembly path free of repeated `Option` mapping boilerplate.
695fn compile_optional_predicate(
696    schema_info: &SchemaInfo,
697    predicate: Option<&Predicate>,
698) -> Option<PredicateProgram> {
699    predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
700}
701
702// Resolve the grouped-only static planning semantics bundle once so grouped
703// aggregate execution specs and grouped DISTINCT strategy stay derived under
704// one shared grouped-plan branch.
705fn resolve_grouped_static_planning_semantics(
706    schema_info: &SchemaInfo,
707    plan: &AccessPlannedQuery,
708    projection_spec: &ProjectionSpec,
709) -> Result<
710    (
711        Option<Vec<GroupedAggregateExecutionSpec>>,
712        Option<GroupedDistinctExecutionStrategy>,
713    ),
714    InternalError,
715> {
716    let Some(grouped) = plan.grouped_plan() else {
717        return Ok((None, None));
718    };
719
720    let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
721        projection_spec,
722        grouped.group.group_fields.as_slice(),
723        grouped.group.aggregates.as_slice(),
724    )?;
725    extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
726
727    let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
728        schema_info,
729        aggregate_specs.as_slice(),
730    )?);
731    let grouped_distinct_execution_strategy = Some(
732        resolved_grouped_distinct_execution_strategy_with_schema_info(
733            schema_info,
734            grouped.group.group_fields.as_slice(),
735            grouped.group.aggregates.as_slice(),
736            grouped.having_expr.as_ref(),
737        )?,
738    );
739
740    Ok((
741        grouped_aggregate_execution_specs,
742        grouped_distinct_execution_strategy,
743    ))
744}
745
746fn extend_grouped_having_aggregate_specs(
747    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
748    grouped: &GroupPlan,
749) -> Result<(), InternalError> {
750    if let Some(having_expr) = grouped.having_expr.as_ref() {
751        collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
752    }
753
754    Ok(())
755}
756
757fn collect_grouped_having_expr_aggregate_specs(
758    aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
759    expr: &Expr,
760) -> Result<(), InternalError> {
761    if !expr.contains_aggregate() {
762        return Ok(());
763    }
764
765    expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
766        let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
767
768        if aggregate_specs
769            .iter()
770            .all(|current| current != &aggregate_spec)
771        {
772            aggregate_specs.push(aggregate_spec);
773        }
774
775        Ok(())
776    })
777}
778
779fn projected_slot_mask_for_spec(
780    model: &EntityModel,
781    direct_projection_slots: Option<&[usize]>,
782) -> Vec<bool> {
783    let schema_slot_len = direct_projection_slots
784        .and_then(|slots| slots.iter().copied().max())
785        .map_or(0, |slot| slot.saturating_add(1));
786    let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
787
788    let Some(direct_projection_slots) = direct_projection_slots else {
789        return projected_slots;
790    };
791
792    for slot in direct_projection_slots.iter().copied() {
793        if let Some(projected) = projected_slots.get_mut(slot) {
794            *projected = true;
795        }
796    }
797
798    projected_slots
799}
800
801fn resolved_order_for_plan(
802    schema_info: &SchemaInfo,
803    plan: &AccessPlannedQuery,
804) -> Result<Option<ResolvedOrder>, InternalError> {
805    if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
806        return Ok(None);
807    }
808
809    let Some(order) = plan.scalar_plan().order.as_ref() else {
810        return Ok(None);
811    };
812
813    let mut fields = Vec::with_capacity(order.fields.len());
814    for term in &order.fields {
815        fields.push(ResolvedOrderField::new(
816            resolved_order_value_source_for_term(schema_info, term)?,
817            term.direction(),
818        ));
819    }
820
821    Ok(Some(ResolvedOrder::new(fields)))
822}
823
824fn resolved_order_value_source_for_term(
825    schema_info: &SchemaInfo,
826    term: &crate::db::query::plan::OrderTerm,
827) -> Result<ResolvedOrderValueSource, InternalError> {
828    if term.direct_field().is_none() {
829        let rendered = term.rendered_label();
830        validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
831        let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
832            .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
833
834        return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
835            &compiled,
836        )));
837    }
838
839    let field = term
840        .direct_field()
841        .expect("direct-field order branch should only execute for field-backed terms");
842    let slot = resolve_required_schema_slot(schema_info, field, || {
843        InternalError::query_invalid_logical_plan(format!(
844            "order expression references unknown field '{field}'",
845        ))
846    })?;
847
848    Ok(ResolvedOrderValueSource::direct_field(slot))
849}
850
851fn validate_resolved_order_expr_fields(
852    schema_info: &SchemaInfo,
853    expr: &Expr,
854    rendered: &str,
855) -> Result<(), InternalError> {
856    expr.try_for_each_tree_expr(&mut |node| match node {
857        Expr::Field(field_id) => {
858            resolve_required_schema_slot(schema_info, field_id.as_str(), || {
859                InternalError::query_invalid_logical_plan(format!(
860                    "order expression references unknown field '{rendered}'",
861                ))
862            })
863            .map(|_| ())
864        }
865        Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
866        #[cfg(test)]
867        Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
868        Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
869        _ => Ok(()),
870    })
871}
872
873// Resolve one schema-authoritative field slot while keeping planner
874// invalid-logical-plan error construction at the callsite that owns the
875// diagnostic wording.
876fn resolve_required_schema_slot<F>(
877    schema_info: &SchemaInfo,
878    field: &str,
879    invalid_plan_error: F,
880) -> Result<usize, InternalError>
881where
882    F: FnOnce() -> InternalError,
883{
884    schema_info
885        .field_slot_index(field)
886        .ok_or_else(invalid_plan_error)
887}
888
889// Keep the scalar-order expression seam violation text under one helper so the
890// parse validation and compile validation paths do not drift.
891fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
892    InternalError::query_invalid_logical_plan(format!(
893        "order expression '{rendered}' did not stay on the scalar expression seam",
894    ))
895}
896
897// Keep one stable executor-facing slot list for grouped order terms after the
898// planner has frozen the structural `ResolvedOrder`. The grouped Top-K route
899// now consumes this same referenced-slot contract instead of re-deriving order
900// sources from planner strategy at runtime.
901fn order_referenced_slots_for_resolved_order(
902    resolved_order: Option<&ResolvedOrder>,
903) -> Option<Vec<usize>> {
904    Some(resolved_order?.referenced_slots())
905}
906
907fn slot_map_for_schema_plan(
908    schema_info: &SchemaInfo,
909    plan: &AccessPlannedQuery,
910) -> Option<Vec<usize>> {
911    let executable = plan.access.executable_contract();
912
913    resolved_index_slots_for_access_path(schema_info, &executable)
914}
915
916fn resolved_index_slots_for_access_path(
917    schema_info: &SchemaInfo,
918    access: &ExecutableAccessPlan<'_, crate::value::Value>,
919) -> Option<Vec<usize>> {
920    let path = access.as_path()?;
921    let path_capabilities = path.capabilities();
922    let key_items = path_capabilities.index_key_items_for_slot_map()?;
923    let mut slots = Vec::new();
924
925    match key_items {
926        IndexKeyItemsRef::Fields(fields) => {
927            slots.reserve(fields.len());
928            for &field_name in fields {
929                let slot = schema_info.field_slot_index(field_name)?;
930                slots.push(slot);
931            }
932        }
933        IndexKeyItemsRef::Items(items) => {
934            slots.reserve(items.len());
935            for key_item in items {
936                let slot = schema_info.field_slot_index(key_item.field())?;
937                slots.push(slot);
938            }
939        }
940    }
941
942    Some(slots)
943}
944
945fn index_compile_targets_for_schema_plan(
946    schema_info: &SchemaInfo,
947    plan: &AccessPlannedQuery,
948) -> Option<Vec<IndexCompileTarget>> {
949    let executable = plan.access.executable_contract();
950    let path = executable.as_path()?;
951    let key_items = path.capabilities().index_key_items_for_slot_map()?;
952    let mut targets = Vec::new();
953
954    match key_items {
955        IndexKeyItemsRef::Fields(fields) => {
956            for (component_index, &field_name) in fields.iter().enumerate() {
957                let field_slot = schema_info.field_slot_index(field_name)?;
958                targets.push(IndexCompileTarget {
959                    component_index,
960                    field_slot,
961                    key_item: crate::model::index::IndexKeyItem::Field(field_name),
962                });
963            }
964        }
965        IndexKeyItemsRef::Items(items) => {
966            for (component_index, &key_item) in items.iter().enumerate() {
967                let field_slot = schema_info.field_slot_index(key_item.field())?;
968                targets.push(IndexCompileTarget {
969                    component_index,
970                    field_slot,
971                    key_item,
972                });
973            }
974        }
975    }
976
977    Some(targets)
978}