1#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9 db::{
10 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11 predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12 query::plan::{
13 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16 LogicalPlan, PlannerRouteProfile, PredicatePushdownDiagnostics, QueryMode,
17 ResidualFilterContract, ResidualFilterShape, ResolvedOrder, ResolvedOrderField,
18 ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
19 derive_logical_pushdown_eligibility,
20 expr::{
21 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
22 compile_scalar_projection_plan_with_schema,
23 },
24 extend_unique_grouped_aggregate_specs_from_expr, grouped_aggregate_execution_specs,
25 grouped_aggregate_specs_from_projection_spec, grouped_cursor_policy_violation,
26 grouped_plan_strategy, lower_data_row_direct_projection_slots_with_schema,
27 lower_direct_projection_slots_with_schema, lower_projection_identity,
28 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
29 residual_query_predicate_after_filtered_access_contract,
30 resolved_grouped_distinct_execution_strategy_with_schema_info,
31 },
32 schema::SchemaInfo,
33 },
34 error::InternalError,
35 model::{
36 entity::EntityModel,
37 index::{IndexKeyItem, IndexKeyItemsRef},
38 },
39};
40
41impl QueryMode {
42 #[must_use]
44 pub const fn is_load(&self) -> bool {
45 match self {
46 Self::Load(_) => true,
47 Self::Delete(_) => false,
48 }
49 }
50
51 #[must_use]
53 pub const fn is_delete(&self) -> bool {
54 match self {
55 Self::Delete(_) => true,
56 Self::Load(_) => false,
57 }
58 }
59}
60
61impl LogicalPlan {
62 #[must_use]
64 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
65 match self {
66 Self::Scalar(plan) => plan,
67 Self::Grouped(plan) => &plan.scalar,
68 }
69 }
70
71 #[must_use]
73 #[cfg(test)]
74 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
75 match self {
76 Self::Scalar(plan) => plan,
77 Self::Grouped(plan) => &mut plan.scalar,
78 }
79 }
80
81 #[must_use]
83 #[cfg(test)]
84 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
85 self.scalar_semantics()
86 }
87
88 #[must_use]
90 #[cfg(test)]
91 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
92 self.scalar_semantics_mut()
93 }
94}
95
96impl AccessPlannedQuery {
97 #[must_use]
99 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
100 self.logical.scalar_semantics()
101 }
102
103 #[must_use]
106 #[cfg(any(test, feature = "sql"))]
107 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
108 self.scalar_plan().consistency
109 }
110
111 #[must_use]
113 #[cfg(test)]
114 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
115 self.logical.scalar_semantics_mut()
116 }
117
118 #[must_use]
120 #[cfg(test)]
121 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
122 self.scalar_plan()
123 }
124
125 #[must_use]
127 #[cfg(test)]
128 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
129 self.scalar_plan_mut()
130 }
131
132 #[must_use]
134 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
135 match &self.logical {
136 LogicalPlan::Scalar(_) => None,
137 LogicalPlan::Grouped(plan) => Some(plan),
138 }
139 }
140
141 #[must_use]
143 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
144 if let Some(static_contract) = &self.static_execution_planning_contract {
145 return static_contract.projection_spec.clone();
146 }
147
148 lower_projection_intent(model, &self.logical, &self.projection_selection)
149 }
150
151 #[must_use]
153 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
154 lower_projection_identity(&self.logical, &self.projection_selection)
155 }
156
157 #[must_use]
163 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
164 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
165 return static_contract.execution_preparation_predicate.clone();
166 }
167
168 derive_execution_preparation_predicate(self)
169 }
170
171 #[must_use]
175 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
176 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
177 return static_contract
178 .residual_filter_contract
179 .residual_filter_predicate()
180 .cloned();
181 }
182
183 derive_residual_filter_predicate(self)
184 }
185
186 #[must_use]
189 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
190 self.effective_execution_predicate().is_some()
191 }
192
193 #[must_use]
196 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
197 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
198 return static_contract
199 .residual_filter_contract
200 .residual_filter_expr();
201 }
202
203 if !derive_has_residual_filter(self) {
204 return None;
205 }
206
207 self.scalar_plan().filter_expr.as_ref()
208 }
209
210 #[must_use]
213 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
214 self.residual_filter_expr().is_some()
215 }
216
217 #[must_use]
219 pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
220 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
221 return static_contract.residual_filter_contract.shape();
222 }
223
224 ResidualFilterShape::from_presence(
225 self.residual_filter_expr().is_some(),
226 self.effective_execution_predicate().is_some(),
227 )
228 }
229
230 #[must_use]
233 pub(in crate::db) fn predicate_pushdown_label(&self) -> String {
234 self.predicate_pushdown_diagnostics().label()
235 }
236
237 #[must_use]
239 pub(in crate::db) fn predicate_pushdown_diagnostics(&self) -> PredicatePushdownDiagnostics {
240 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
241 return static_contract.predicate_pushdown_diagnostics;
242 }
243
244 derive_predicate_pushdown_diagnostics(self, self.residual_filter_shape())
245 }
246
247 #[must_use]
249 pub(in crate::db) fn predicate_pushdown_outcome_label(&self) -> &'static str {
250 self.predicate_pushdown_diagnostics().outcome_label()
251 }
252
253 #[must_use]
255 pub(in crate::db) fn predicate_pushdown_reason_label(&self) -> &'static str {
256 self.predicate_pushdown_diagnostics().reason_label()
257 }
258
259 #[must_use]
261 pub(in crate::db) const fn execution_preparation_compiled_predicate(
262 &self,
263 ) -> Option<&PredicateProgram> {
264 self.static_execution_planning_contract()
265 .execution_preparation_compiled_predicate
266 .as_ref()
267 }
268
269 #[must_use]
271 pub(in crate::db) const fn effective_runtime_compiled_predicate(
272 &self,
273 ) -> Option<&PredicateProgram> {
274 match self
275 .static_execution_planning_contract()
276 .residual_filter_contract
277 .effective_runtime_filter_program()
278 {
279 Some(program) => program.predicate_program(),
280 None => None,
281 }
282 }
283
284 #[cfg(test)]
286 #[must_use]
287 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
288 &self,
289 ) -> Option<&CompiledExpr> {
290 match self
291 .static_execution_planning_contract()
292 .residual_filter_contract
293 .effective_runtime_filter_program()
294 {
295 Some(program) => program.expression_filter(),
296 None => None,
297 }
298 }
299
300 #[must_use]
302 pub(in crate::db) const fn effective_runtime_filter_program(
303 &self,
304 ) -> Option<&EffectiveRuntimeFilterProgram> {
305 self.static_execution_planning_contract()
306 .residual_filter_contract
307 .effective_runtime_filter_program()
308 }
309
310 #[must_use]
312 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
313 if !self.scalar_plan().distinct {
314 return DistinctExecutionStrategy::None;
315 }
316
317 match distinct_runtime_dedup_strategy(&self.access) {
321 Some(strategy) => strategy,
322 None => DistinctExecutionStrategy::None,
323 }
324 }
325
326 #[cfg(test)]
328 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
329 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
330 }
331
332 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
334 &mut self,
335 schema_info: &SchemaInfo,
336 ) {
337 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
338 }
339
340 #[cfg(test)]
342 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
343 &mut self,
344 model: &EntityModel,
345 ) -> Result<(), InternalError> {
346 self.finalize_static_execution_planning_contract_for_model_with_schema(
347 model,
348 SchemaInfo::cached_for_generated_entity_model(model),
349 )
350 }
351
352 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
354 &mut self,
355 model: &EntityModel,
356 schema_info: &SchemaInfo,
357 ) -> Result<(), InternalError> {
358 self.static_execution_planning_contract = Some(
359 project_static_execution_planning_contract_for_model(model, schema_info, self)?,
360 );
361
362 Ok(())
363 }
364
365 #[must_use]
367 pub(in crate::db) fn execution_shape_signature(
368 &self,
369 entity_path: &'static str,
370 ) -> ExecutionShapeSignature {
371 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
372 }
373
374 #[must_use]
377 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
378 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
379 return self.scalar_plan().predicate.is_some()
380 && !static_contract
381 .residual_filter_contract
382 .has_residual_filter();
383 }
384
385 derive_predicate_fully_satisfied_by_access_contract(self)
386 }
387
388 #[must_use]
390 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
391 self.static_execution_planning_contract()
392 .scalar_projection_plan
393 .as_deref()
394 }
395
396 #[must_use]
398 pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
399 self.static_execution_planning_contract.is_some()
400 }
401
402 #[must_use]
404 pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
405 self.static_execution_planning_contract()
406 .primary_key_names
407 .iter()
408 .map(String::as_str)
409 .collect()
410 }
411
412 #[must_use]
414 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
415 self.static_execution_planning_contract()
416 .projection_referenced_slots
417 .as_slice()
418 }
419
420 #[must_use]
422 #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
423 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
424 self.static_execution_planning_contract()
425 .projected_slot_mask
426 .as_slice()
427 }
428
429 #[must_use]
431 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
432 self.static_execution_planning_contract()
433 .projection_is_model_identity
434 }
435
436 #[must_use]
438 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
439 self.static_execution_planning_contract()
440 .order_referenced_slots
441 .as_deref()
442 }
443
444 #[must_use]
446 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
447 self.static_execution_planning_contract()
448 .resolved_order
449 .as_ref()
450 }
451
452 #[must_use]
454 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
455 self.static_execution_planning_contract()
456 .slot_map
457 .as_deref()
458 }
459
460 #[must_use]
462 pub(in crate::db) fn grouped_aggregate_execution_specs(
463 &self,
464 ) -> Option<&[GroupedAggregateExecutionSpec]> {
465 self.static_execution_planning_contract()
466 .grouped_aggregate_execution_specs
467 .as_deref()
468 }
469
470 #[must_use]
472 pub(in crate::db) const fn grouped_distinct_execution_strategy(
473 &self,
474 ) -> Option<&GroupedDistinctExecutionStrategy> {
475 self.static_execution_planning_contract()
476 .grouped_distinct_execution_strategy
477 .as_ref()
478 }
479
480 #[must_use]
482 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
483 &self.static_execution_planning_contract().projection_spec
484 }
485
486 #[must_use]
488 #[cfg(any(test, feature = "sql"))]
489 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
490 self.static_execution_planning_contract()
491 .projection_direct_slots
492 .as_deref()
493 }
494
495 #[must_use]
497 #[cfg(any(test, feature = "sql"))]
498 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
499 self.static_execution_planning_contract()
500 .projection_data_row_direct_slots
501 .as_deref()
502 }
503
504 #[must_use]
506 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
507 self.static_execution_planning_contract()
508 .index_compile_targets
509 .as_deref()
510 }
511
512 const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
513 self.static_execution_planning_contract
514 .as_ref()
515 .expect("query semantics invariant")
516 }
517}
518
519fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
520 match access {
521 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
522 Some(DistinctExecutionStrategy::PreOrdered)
523 }
524 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
525 Some(DistinctExecutionStrategy::HashMaterialize)
526 }
527 AccessPlan::Path(_) => None,
528 }
529}
530
531fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
532 let is_grouped_safe = plan
533 .grouped_plan()
534 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
535
536 ContinuationPolicy::new(
537 true, true, is_grouped_safe,
540 )
541}
542
543#[must_use]
545#[cfg(test)]
546pub(in crate::db) fn project_planner_route_profile_for_model(
547 model: &EntityModel,
548 plan: &AccessPlannedQuery,
549) -> PlannerRouteProfile {
550 let primary_key_names = ordered_primary_key_names(model);
551 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
552 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
553 });
554
555 PlannerRouteProfile::new(
556 derive_continuation_policy_validated(plan),
557 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
558 secondary_order_contract,
559 )
560}
561
562#[must_use]
564pub(in crate::db) fn project_planner_route_profile_for_schema(
565 schema_info: &SchemaInfo,
566 plan: &AccessPlannedQuery,
567) -> PlannerRouteProfile {
568 let primary_key_names = primary_key_names_from_schema(schema_info);
569 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
570 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
571 });
572
573 PlannerRouteProfile::new(
574 derive_continuation_policy_validated(plan),
575 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
576 secondary_order_contract,
577 )
578}
579
580fn project_static_execution_planning_contract_for_model(
581 model: &EntityModel,
582 schema_info: &SchemaInfo,
583 plan: &AccessPlannedQuery,
584) -> Result<StaticExecutionPlanningContract, InternalError> {
585 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
586 let execution_preparation_predicate = plan.execution_preparation_predicate();
587 let residual_filter_predicate = derive_residual_filter_predicate(plan);
588 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
589 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
590 schema_info,
591 residual_filter_expr.as_ref(),
592 residual_filter_predicate.as_ref(),
593 )?;
594 let residual_filter_contract = ResidualFilterContract::new(
595 residual_filter_expr,
596 residual_filter_predicate,
597 effective_runtime_filter_program,
598 );
599 let residual_filter_shape = residual_filter_contract.shape();
600 let execution_preparation_compiled_predicate =
601 should_compile_execution_preparation_predicate(residual_filter_shape)
602 .then(|| {
603 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref())
604 })
605 .flatten();
606 let predicate_pushdown_diagnostics =
607 derive_predicate_pushdown_diagnostics(plan, residual_filter_shape);
608 let scalar_projection_plan = if plan.grouped_plan().is_none() {
609 Some(
610 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
611 .ok_or_else(InternalError::query_executor_invariant)?
612 .iter()
613 .map(CompiledExpr::compile)
614 .collect(),
615 )
616 } else {
617 None
618 };
619 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
620 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
621 let projection_direct_slots = lower_direct_projection_slots_with_schema(
622 model,
623 schema_info,
624 &plan.logical,
625 &plan.projection_selection,
626 );
627 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
628 model,
629 schema_info,
630 &plan.logical,
631 &plan.projection_selection,
632 );
633 let projection_referenced_slots =
634 projection_spec.referenced_slots_for_schema(model, schema_info)?;
635 let projected_slot_mask =
636 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
637 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
638 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
639 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
640 let slot_map = slot_map_for_schema_plan(schema_info, plan);
641 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
642
643 Ok(StaticExecutionPlanningContract {
644 primary_key_names: schema_info.primary_key_names().to_vec(),
645 projection_spec,
646 execution_preparation_predicate,
647 execution_preparation_compiled_predicate,
648 residual_filter_contract,
649 predicate_pushdown_diagnostics,
650 scalar_projection_plan,
651 grouped_aggregate_execution_specs,
652 grouped_distinct_execution_strategy,
653 projection_direct_slots,
654 projection_data_row_direct_slots,
655 projection_referenced_slots,
656 projected_slot_mask,
657 projection_is_model_identity,
658 resolved_order,
659 order_referenced_slots,
660 slot_map,
661 index_compile_targets,
662 })
663}
664
665#[cfg(test)]
666fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
667 model.primary_key_names()
668}
669
670fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
671 schema_info
672 .primary_key_names()
673 .iter()
674 .map(String::as_str)
675 .collect()
676}
677
678fn compile_effective_runtime_filter_program(
682 schema_info: &SchemaInfo,
683 residual_filter_expr: Option<&Expr>,
684 residual_filter_predicate: Option<&Predicate>,
685) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
686 if let Some(predicate) = residual_filter_predicate {
691 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
692 PredicateProgram::compile_with_schema_info(schema_info, predicate),
693 )));
694 }
695
696 if let Some(filter_expr) = residual_filter_expr {
697 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
698 .ok_or_else(InternalError::query_invalid_logical_plan)?;
699
700 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
701 CompiledExpr::compile(&compiled),
702 )));
703 }
704
705 Ok(None)
706}
707
708fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
712 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
713
714 match plan.access.selected_index_contract() {
715 Some(index) => {
716 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
717 }
718 None => Some(query_predicate.clone()),
719 }
720}
721
722fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
726 let filtered_residual = derive_execution_preparation_predicate(plan);
727 let filtered_residual = filtered_residual.as_ref()?;
728
729 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
730}
731
732fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
736 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
737 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
738 return None;
739 }
740
741 Some(filter_expr.clone())
742}
743
744fn derive_residual_filter_expr_for_model(
748 model: &EntityModel,
749 plan: &AccessPlannedQuery,
750) -> Option<Expr> {
751 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
752 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
753 return None;
754 }
755
756 Some(filter_expr.clone())
757}
758
759fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
763 match (
764 plan.scalar_plan().filter_expr.as_ref(),
765 plan.scalar_plan().predicate.as_ref(),
766 ) {
767 (None, None) => false,
768 (Some(_), None) => true,
769 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
770 }
771}
772
773fn derive_predicate_pushdown_diagnostics(
777 plan: &AccessPlannedQuery,
778 residual_filter_shape: ResidualFilterShape,
779) -> PredicatePushdownDiagnostics {
780 PredicatePushdownDiagnostics::from_plan(
781 plan.scalar_plan().filter_expr.is_some(),
782 plan.scalar_plan().predicate_covers_filter_expr,
783 plan.scalar_plan().predicate.as_ref(),
784 &plan.access,
785 residual_filter_shape,
786 )
787}
788
789fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
792 plan.scalar_plan().predicate.is_some()
793 && derive_residual_filter_predicate(plan).is_none()
794 && derive_residual_filter_expr(plan).is_none()
795}
796
797const fn derive_semantic_filter_fully_satisfied_by_access_contract(
801 plan: &AccessPlannedQuery,
802) -> bool {
803 plan.scalar_plan().filter_expr.is_some()
804 && plan.scalar_plan().predicate.is_some()
805 && plan.scalar_plan().predicate_covers_filter_expr
806}
807
808const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
812 _model: &EntityModel,
813 plan: &AccessPlannedQuery,
814) -> bool {
815 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
816}
817
818fn compile_optional_predicate(
821 schema_info: &SchemaInfo,
822 predicate: Option<&Predicate>,
823) -> Option<PredicateProgram> {
824 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
825}
826
827const fn should_compile_execution_preparation_predicate(
832 residual_filter_shape: ResidualFilterShape,
833) -> bool {
834 !residual_filter_shape.is_absent()
835}
836
837fn resolve_grouped_static_planning_semantics(
841 schema_info: &SchemaInfo,
842 plan: &AccessPlannedQuery,
843 projection_spec: &ProjectionSpec,
844) -> Result<
845 (
846 Option<Vec<GroupedAggregateExecutionSpec>>,
847 Option<GroupedDistinctExecutionStrategy>,
848 ),
849 InternalError,
850> {
851 let Some(grouped) = plan.grouped_plan() else {
852 return Ok((None, None));
853 };
854
855 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
856 projection_spec,
857 grouped.group.group_fields.as_slice(),
858 grouped.group.aggregates.as_slice(),
859 )?;
860 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
861
862 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
863 schema_info,
864 aggregate_specs.as_slice(),
865 )?);
866 let grouped_distinct_execution_strategy = Some(
867 resolved_grouped_distinct_execution_strategy_with_schema_info(
868 schema_info,
869 grouped.group.group_fields.as_slice(),
870 grouped.group.aggregates.as_slice(),
871 grouped.having_expr.as_ref(),
872 )?,
873 );
874
875 Ok((
876 grouped_aggregate_execution_specs,
877 grouped_distinct_execution_strategy,
878 ))
879}
880
881fn extend_grouped_having_aggregate_specs(
882 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
883 grouped: &GroupPlan,
884) -> Result<(), InternalError> {
885 if let Some(having_expr) = grouped.having_expr.as_ref() {
886 extend_unique_grouped_aggregate_specs_from_expr(aggregate_specs, having_expr)?;
887 }
888
889 Ok(())
890}
891
892fn projected_slot_mask_for_spec(
893 model: &EntityModel,
894 direct_projection_slots: Option<&[usize]>,
895) -> Vec<bool> {
896 let schema_slot_len = direct_projection_slots
897 .and_then(|slots| slots.iter().copied().max())
898 .map_or(0, |slot| slot.saturating_add(1));
899 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
900
901 let Some(direct_projection_slots) = direct_projection_slots else {
902 return projected_slots;
903 };
904
905 for slot in direct_projection_slots.iter().copied() {
906 if let Some(projected) = projected_slots.get_mut(slot) {
907 *projected = true;
908 }
909 }
910
911 projected_slots
912}
913
914fn resolved_order_for_plan(
915 schema_info: &SchemaInfo,
916 plan: &AccessPlannedQuery,
917) -> Result<Option<ResolvedOrder>, InternalError> {
918 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
919 return Ok(None);
920 }
921
922 let Some(order) = plan.scalar_plan().order.as_ref() else {
923 return Ok(None);
924 };
925
926 let mut fields = Vec::with_capacity(order.fields.len());
927 for term in &order.fields {
928 fields.push(ResolvedOrderField::new(
929 resolved_order_value_source_for_term(schema_info, term)?,
930 term.direction(),
931 ));
932 }
933
934 Ok(Some(ResolvedOrder::new(fields)))
935}
936
937fn resolved_order_value_source_for_term(
938 schema_info: &SchemaInfo,
939 term: &crate::db::query::plan::OrderTerm,
940) -> Result<ResolvedOrderValueSource, InternalError> {
941 if term.direct_field().is_none() {
942 let rendered = term.rendered_label();
943 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
944 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
945 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
946
947 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
948 &compiled,
949 )));
950 }
951
952 let field = term.direct_field().expect("query semantics invariant");
953 let slot = resolve_required_schema_slot(
954 schema_info,
955 field,
956 InternalError::query_invalid_logical_plan,
957 )?;
958
959 Ok(ResolvedOrderValueSource::direct_field(slot))
960}
961
962fn validate_resolved_order_expr_fields(
963 schema_info: &SchemaInfo,
964 expr: &Expr,
965 rendered: &str,
966) -> Result<(), InternalError> {
967 expr.try_for_each_tree_expr(&mut |node| match node {
968 Expr::Field(field_id) => resolve_required_schema_slot(
969 schema_info,
970 field_id.as_str(),
971 InternalError::query_invalid_logical_plan,
972 )
973 .map(|_| ()),
974 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
975 #[cfg(test)]
976 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
977 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
978 _ => Ok(()),
979 })
980}
981
982fn resolve_required_schema_slot<F>(
986 schema_info: &SchemaInfo,
987 field: &str,
988 invalid_plan_error: F,
989) -> Result<usize, InternalError>
990where
991 F: FnOnce() -> InternalError,
992{
993 schema_info
994 .field_slot_index(field)
995 .ok_or_else(invalid_plan_error)
996}
997
998fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
1001 InternalError::query_invalid_logical_plan()
1002}
1003
1004fn order_referenced_slots_for_resolved_order(
1009 resolved_order: Option<&ResolvedOrder>,
1010) -> Option<Vec<usize>> {
1011 Some(resolved_order?.referenced_slots())
1012}
1013
1014fn slot_map_for_schema_plan(
1015 schema_info: &SchemaInfo,
1016 plan: &AccessPlannedQuery,
1017) -> Option<Vec<usize>> {
1018 let executable = plan.access.executable_contract();
1019
1020 resolved_index_slots_for_access_path(schema_info, &executable)
1021}
1022
1023fn resolved_index_slots_for_access_path(
1024 schema_info: &SchemaInfo,
1025 access: &ExecutableAccessPlan<'_, crate::value::Value>,
1026) -> Option<Vec<usize>> {
1027 let path = access.as_path()?;
1028 let path_facts = path.shape_facts();
1029 let key_items = path_facts.index_key_items_for_slot_map()?;
1030 let mut slots = Vec::new();
1031
1032 match key_items.key_items() {
1033 SemanticIndexKeyItemsRef::Fields(fields) => {
1034 slots.reserve(fields.len());
1035 for field_name in fields {
1036 let slot = schema_info.field_slot_index(field_name)?;
1037 slots.push(slot);
1038 }
1039 }
1040 SemanticIndexKeyItemsRef::Accepted(items) => {
1041 slots.reserve(items.len());
1042 for key_item in items {
1043 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1044 slots.push(slot);
1045 }
1046 }
1047 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1048 slots.reserve(fields.len());
1049 for &field_name in fields {
1050 let slot = schema_info.field_slot_index(field_name)?;
1051 slots.push(slot);
1052 }
1053 }
1054 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1055 slots.reserve(items.len());
1056 for key_item in items {
1057 let slot = schema_info.field_slot_index(key_item.field())?;
1058 slots.push(slot);
1059 }
1060 }
1061 }
1062
1063 Some(slots)
1064}
1065
1066fn index_compile_targets_for_schema_plan(
1067 schema_info: &SchemaInfo,
1068 plan: &AccessPlannedQuery,
1069) -> Option<Vec<IndexCompileTarget>> {
1070 let executable = plan.access.executable_contract();
1071 let path = executable.as_path()?;
1072 let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1073 let mut targets = Vec::new();
1074
1075 match key_items.key_items() {
1076 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1077 return None;
1078 }
1079 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1080 for (component_index, &field_name) in fields.iter().enumerate() {
1081 let field_slot = schema_info.field_slot_index(field_name)?;
1082 targets.push(IndexCompileTarget {
1083 component_index,
1084 field_slot,
1085 key_item: IndexKeyItem::Field(field_name),
1086 });
1087 }
1088 }
1089 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1090 for (component_index, &key_item) in items.iter().enumerate() {
1091 let field_slot = schema_info.field_slot_index(key_item.field())?;
1092 targets.push(IndexCompileTarget {
1093 component_index,
1094 field_slot,
1095 key_item,
1096 });
1097 }
1098 }
1099 }
1100
1101 Some(targets)
1102}