1#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9 db::{
10 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11 predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12 query::plan::{
13 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16 LogicalPlan, PlannerRouteProfile, PredicatePushdownDiagnostics, QueryMode,
17 ResidualFilterContract, ResidualFilterShape, ResolvedOrder, ResolvedOrderField,
18 ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
19 derive_logical_pushdown_eligibility,
20 expr::{
21 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
22 compile_scalar_projection_plan_with_schema,
23 },
24 extend_unique_grouped_aggregate_specs_from_expr, grouped_aggregate_execution_specs,
25 grouped_aggregate_specs_from_projection_spec, grouped_cursor_policy_violation,
26 grouped_plan_strategy, lower_data_row_direct_projection_slots_with_schema,
27 lower_direct_projection_slots_with_schema, lower_projection_identity,
28 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
29 residual_query_predicate_after_filtered_access_contract,
30 resolved_grouped_distinct_execution_strategy_with_schema_info,
31 },
32 schema::SchemaInfo,
33 },
34 error::InternalError,
35 model::{
36 entity::EntityModel,
37 index::{IndexKeyItem, IndexKeyItemsRef},
38 },
39};
40
41impl QueryMode {
42 #[must_use]
44 pub const fn is_load(&self) -> bool {
45 match self {
46 Self::Load(_) => true,
47 Self::Delete(_) => false,
48 }
49 }
50
51 #[must_use]
53 pub const fn is_delete(&self) -> bool {
54 match self {
55 Self::Delete(_) => true,
56 Self::Load(_) => false,
57 }
58 }
59}
60
61impl LogicalPlan {
62 #[must_use]
64 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
65 match self {
66 Self::Scalar(plan) => plan,
67 Self::Grouped(plan) => &plan.scalar,
68 }
69 }
70
71 #[must_use]
73 #[cfg(test)]
74 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
75 match self {
76 Self::Scalar(plan) => plan,
77 Self::Grouped(plan) => &mut plan.scalar,
78 }
79 }
80
81 #[must_use]
83 #[cfg(test)]
84 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
85 self.scalar_semantics()
86 }
87
88 #[must_use]
90 #[cfg(test)]
91 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
92 self.scalar_semantics_mut()
93 }
94}
95
96impl AccessPlannedQuery {
97 #[must_use]
99 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
100 self.logical.scalar_semantics()
101 }
102
103 #[must_use]
106 #[cfg(any(test, feature = "sql"))]
107 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
108 self.scalar_plan().consistency
109 }
110
111 #[must_use]
113 #[cfg(test)]
114 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
115 self.logical.scalar_semantics_mut()
116 }
117
118 #[must_use]
120 #[cfg(test)]
121 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
122 self.scalar_plan()
123 }
124
125 #[must_use]
127 #[cfg(test)]
128 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
129 self.scalar_plan_mut()
130 }
131
132 #[must_use]
134 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
135 match &self.logical {
136 LogicalPlan::Scalar(_) => None,
137 LogicalPlan::Grouped(plan) => Some(plan),
138 }
139 }
140
141 #[must_use]
143 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
144 if let Some(static_contract) = &self.static_execution_planning_contract {
145 return static_contract.projection_spec.clone();
146 }
147
148 lower_projection_intent(model, &self.logical, &self.projection_selection)
149 }
150
151 #[must_use]
153 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
154 lower_projection_identity(&self.logical, &self.projection_selection)
155 }
156
157 #[must_use]
163 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
164 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
165 return static_contract.execution_preparation_predicate.clone();
166 }
167
168 derive_execution_preparation_predicate(self)
169 }
170
171 #[must_use]
175 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
176 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
177 return static_contract
178 .residual_filter_contract
179 .residual_filter_predicate()
180 .cloned();
181 }
182
183 derive_residual_filter_predicate(self)
184 }
185
186 #[must_use]
189 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
190 self.effective_execution_predicate().is_some()
191 }
192
193 #[must_use]
196 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
197 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
198 return static_contract
199 .residual_filter_contract
200 .residual_filter_expr();
201 }
202
203 if !derive_has_residual_filter(self) {
204 return None;
205 }
206
207 self.scalar_plan().filter_expr.as_ref()
208 }
209
210 #[must_use]
213 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
214 self.residual_filter_expr().is_some()
215 }
216
217 #[must_use]
219 pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
220 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
221 return static_contract.residual_filter_contract.shape();
222 }
223
224 ResidualFilterShape::from_presence(
225 self.residual_filter_expr().is_some(),
226 self.effective_execution_predicate().is_some(),
227 )
228 }
229
230 #[must_use]
233 pub(in crate::db) fn predicate_pushdown_label(&self) -> String {
234 self.predicate_pushdown_diagnostics().label()
235 }
236
237 #[must_use]
239 pub(in crate::db) fn predicate_pushdown_diagnostics(&self) -> PredicatePushdownDiagnostics {
240 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
241 return static_contract.predicate_pushdown_diagnostics;
242 }
243
244 PredicatePushdownDiagnostics::from_plan(
245 self.scalar_plan().filter_expr.is_some(),
246 self.scalar_plan().predicate_covers_filter_expr,
247 self.scalar_plan().predicate.as_ref(),
248 &self.access,
249 self.residual_filter_shape(),
250 )
251 }
252
253 #[must_use]
255 pub(in crate::db) fn predicate_pushdown_outcome_label(&self) -> &'static str {
256 self.predicate_pushdown_diagnostics().outcome_label()
257 }
258
259 #[must_use]
261 pub(in crate::db) fn predicate_pushdown_reason_label(&self) -> &'static str {
262 self.predicate_pushdown_diagnostics().reason_label()
263 }
264
265 #[must_use]
267 pub(in crate::db) const fn execution_preparation_compiled_predicate(
268 &self,
269 ) -> Option<&PredicateProgram> {
270 self.static_execution_planning_contract()
271 .execution_preparation_compiled_predicate
272 .as_ref()
273 }
274
275 #[must_use]
277 pub(in crate::db) const fn effective_runtime_compiled_predicate(
278 &self,
279 ) -> Option<&PredicateProgram> {
280 match self
281 .static_execution_planning_contract()
282 .residual_filter_contract
283 .effective_runtime_filter_program()
284 {
285 Some(program) => program.predicate_program(),
286 None => None,
287 }
288 }
289
290 #[cfg(test)]
292 #[must_use]
293 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
294 &self,
295 ) -> Option<&CompiledExpr> {
296 match self
297 .static_execution_planning_contract()
298 .residual_filter_contract
299 .effective_runtime_filter_program()
300 {
301 Some(program) => program.expression_filter(),
302 None => None,
303 }
304 }
305
306 #[must_use]
308 pub(in crate::db) const fn effective_runtime_filter_program(
309 &self,
310 ) -> Option<&EffectiveRuntimeFilterProgram> {
311 self.static_execution_planning_contract()
312 .residual_filter_contract
313 .effective_runtime_filter_program()
314 }
315
316 #[must_use]
318 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
319 if !self.scalar_plan().distinct {
320 return DistinctExecutionStrategy::None;
321 }
322
323 match distinct_runtime_dedup_strategy(&self.access) {
327 Some(strategy) => strategy,
328 None => DistinctExecutionStrategy::None,
329 }
330 }
331
332 #[cfg(test)]
334 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
335 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
336 }
337
338 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
340 &mut self,
341 schema_info: &SchemaInfo,
342 ) {
343 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
344 }
345
346 #[cfg(test)]
348 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
349 &mut self,
350 model: &EntityModel,
351 ) -> Result<(), InternalError> {
352 self.finalize_static_execution_planning_contract_for_model_with_schema(
353 model,
354 SchemaInfo::cached_for_generated_entity_model(model),
355 )
356 }
357
358 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
360 &mut self,
361 model: &EntityModel,
362 schema_info: &SchemaInfo,
363 ) -> Result<(), InternalError> {
364 self.static_execution_planning_contract = Some(
365 project_static_execution_planning_contract_for_model(model, schema_info, self)?,
366 );
367
368 Ok(())
369 }
370
371 #[must_use]
373 pub(in crate::db) fn execution_shape_signature(
374 &self,
375 entity_path: &'static str,
376 ) -> ExecutionShapeSignature {
377 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
378 }
379
380 #[must_use]
383 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
384 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
385 return self.scalar_plan().predicate.is_some()
386 && !static_contract
387 .residual_filter_contract
388 .has_residual_filter();
389 }
390
391 derive_predicate_fully_satisfied_by_access_contract(self)
392 }
393
394 #[must_use]
396 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
397 self.static_execution_planning_contract()
398 .scalar_projection_plan
399 .as_deref()
400 }
401
402 #[must_use]
404 pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
405 self.static_execution_planning_contract.is_some()
406 }
407
408 #[must_use]
410 pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
411 self.static_execution_planning_contract()
412 .primary_key_names
413 .iter()
414 .map(String::as_str)
415 .collect()
416 }
417
418 #[must_use]
420 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
421 self.static_execution_planning_contract()
422 .projection_referenced_slots
423 .as_slice()
424 }
425
426 #[must_use]
428 #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
429 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
430 self.static_execution_planning_contract()
431 .projected_slot_mask
432 .as_slice()
433 }
434
435 #[must_use]
437 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
438 self.static_execution_planning_contract()
439 .projection_is_model_identity
440 }
441
442 #[must_use]
444 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
445 self.static_execution_planning_contract()
446 .order_referenced_slots
447 .as_deref()
448 }
449
450 #[must_use]
452 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
453 self.static_execution_planning_contract()
454 .resolved_order
455 .as_ref()
456 }
457
458 #[must_use]
460 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
461 self.static_execution_planning_contract()
462 .slot_map
463 .as_deref()
464 }
465
466 #[must_use]
468 pub(in crate::db) fn grouped_aggregate_execution_specs(
469 &self,
470 ) -> Option<&[GroupedAggregateExecutionSpec]> {
471 self.static_execution_planning_contract()
472 .grouped_aggregate_execution_specs
473 .as_deref()
474 }
475
476 #[must_use]
478 pub(in crate::db) const fn grouped_distinct_execution_strategy(
479 &self,
480 ) -> Option<&GroupedDistinctExecutionStrategy> {
481 self.static_execution_planning_contract()
482 .grouped_distinct_execution_strategy
483 .as_ref()
484 }
485
486 #[must_use]
488 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
489 &self.static_execution_planning_contract().projection_spec
490 }
491
492 #[must_use]
494 #[cfg(any(test, feature = "sql"))]
495 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
496 self.static_execution_planning_contract()
497 .projection_direct_slots
498 .as_deref()
499 }
500
501 #[must_use]
503 #[cfg(any(test, feature = "sql"))]
504 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
505 self.static_execution_planning_contract()
506 .projection_data_row_direct_slots
507 .as_deref()
508 }
509
510 #[must_use]
512 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
513 self.static_execution_planning_contract()
514 .index_compile_targets
515 .as_deref()
516 }
517
518 const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
519 self.static_execution_planning_contract
520 .as_ref()
521 .expect("query semantics invariant")
522 }
523}
524
525fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
526 match access {
527 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
528 Some(DistinctExecutionStrategy::PreOrdered)
529 }
530 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
531 Some(DistinctExecutionStrategy::HashMaterialize)
532 }
533 AccessPlan::Path(_) => None,
534 }
535}
536
537fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
538 let is_grouped_safe = plan
539 .grouped_plan()
540 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
541
542 ContinuationPolicy::new(
543 true, true, is_grouped_safe,
546 )
547}
548
549#[must_use]
551#[cfg(test)]
552pub(in crate::db) fn project_planner_route_profile_for_model(
553 model: &EntityModel,
554 plan: &AccessPlannedQuery,
555) -> PlannerRouteProfile {
556 let primary_key_names = ordered_primary_key_names(model);
557 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
558 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
559 });
560
561 PlannerRouteProfile::new(
562 derive_continuation_policy_validated(plan),
563 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
564 secondary_order_contract,
565 )
566}
567
568#[must_use]
570pub(in crate::db) fn project_planner_route_profile_for_schema(
571 schema_info: &SchemaInfo,
572 plan: &AccessPlannedQuery,
573) -> PlannerRouteProfile {
574 let primary_key_names = primary_key_names_from_schema(schema_info);
575 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
576 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
577 });
578
579 PlannerRouteProfile::new(
580 derive_continuation_policy_validated(plan),
581 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
582 secondary_order_contract,
583 )
584}
585
586fn project_static_execution_planning_contract_for_model(
587 model: &EntityModel,
588 schema_info: &SchemaInfo,
589 plan: &AccessPlannedQuery,
590) -> Result<StaticExecutionPlanningContract, InternalError> {
591 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
592 let execution_preparation_predicate = plan.execution_preparation_predicate();
593 let residual_filter_predicate = derive_residual_filter_predicate(plan);
594 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
595 let execution_preparation_compiled_predicate = should_compile_execution_preparation_predicate(
596 residual_filter_expr.as_ref(),
597 residual_filter_predicate.as_ref(),
598 )
599 .then(|| compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref()))
600 .flatten();
601 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
602 schema_info,
603 residual_filter_expr.as_ref(),
604 residual_filter_predicate.as_ref(),
605 )?;
606 let residual_filter_contract = ResidualFilterContract::new(
607 residual_filter_expr,
608 residual_filter_predicate,
609 effective_runtime_filter_program,
610 );
611 let predicate_pushdown_diagnostics = PredicatePushdownDiagnostics::from_plan(
612 plan.scalar_plan().filter_expr.is_some(),
613 plan.scalar_plan().predicate_covers_filter_expr,
614 plan.scalar_plan().predicate.as_ref(),
615 &plan.access,
616 residual_filter_contract.shape(),
617 );
618 let scalar_projection_plan = if plan.grouped_plan().is_none() {
619 Some(
620 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
621 .ok_or_else(InternalError::query_executor_invariant)?
622 .iter()
623 .map(CompiledExpr::compile)
624 .collect(),
625 )
626 } else {
627 None
628 };
629 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
630 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
631 let projection_direct_slots = lower_direct_projection_slots_with_schema(
632 model,
633 schema_info,
634 &plan.logical,
635 &plan.projection_selection,
636 );
637 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
638 model,
639 schema_info,
640 &plan.logical,
641 &plan.projection_selection,
642 );
643 let projection_referenced_slots =
644 projection_spec.referenced_slots_for_schema(model, schema_info)?;
645 let projected_slot_mask =
646 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
647 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
648 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
649 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
650 let slot_map = slot_map_for_schema_plan(schema_info, plan);
651 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
652
653 Ok(StaticExecutionPlanningContract {
654 primary_key_names: schema_info.primary_key_names().to_vec(),
655 projection_spec,
656 execution_preparation_predicate,
657 execution_preparation_compiled_predicate,
658 residual_filter_contract,
659 predicate_pushdown_diagnostics,
660 scalar_projection_plan,
661 grouped_aggregate_execution_specs,
662 grouped_distinct_execution_strategy,
663 projection_direct_slots,
664 projection_data_row_direct_slots,
665 projection_referenced_slots,
666 projected_slot_mask,
667 projection_is_model_identity,
668 resolved_order,
669 order_referenced_slots,
670 slot_map,
671 index_compile_targets,
672 })
673}
674
675#[cfg(test)]
676fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
677 model.primary_key_names()
678}
679
680fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
681 schema_info
682 .primary_key_names()
683 .iter()
684 .map(String::as_str)
685 .collect()
686}
687
688fn compile_effective_runtime_filter_program(
692 schema_info: &SchemaInfo,
693 residual_filter_expr: Option<&Expr>,
694 residual_filter_predicate: Option<&Predicate>,
695) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
696 if let Some(predicate) = residual_filter_predicate {
701 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
702 PredicateProgram::compile_with_schema_info(schema_info, predicate),
703 )));
704 }
705
706 if let Some(filter_expr) = residual_filter_expr {
707 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
708 .ok_or_else(InternalError::query_invalid_logical_plan)?;
709
710 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
711 CompiledExpr::compile(&compiled),
712 )));
713 }
714
715 Ok(None)
716}
717
718fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
722 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
723
724 match plan.access.selected_index_contract() {
725 Some(index) => {
726 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
727 }
728 None => Some(query_predicate.clone()),
729 }
730}
731
732fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
736 let filtered_residual = derive_execution_preparation_predicate(plan);
737 let filtered_residual = filtered_residual.as_ref()?;
738
739 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
740}
741
742fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
746 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
747 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
748 return None;
749 }
750
751 Some(filter_expr.clone())
752}
753
754fn derive_residual_filter_expr_for_model(
758 model: &EntityModel,
759 plan: &AccessPlannedQuery,
760) -> Option<Expr> {
761 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
762 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
763 return None;
764 }
765
766 Some(filter_expr.clone())
767}
768
769fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
773 match (
774 plan.scalar_plan().filter_expr.as_ref(),
775 plan.scalar_plan().predicate.as_ref(),
776 ) {
777 (None, None) => false,
778 (Some(_), None) => true,
779 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
780 }
781}
782
783fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
786 plan.scalar_plan().predicate.is_some()
787 && derive_residual_filter_predicate(plan).is_none()
788 && derive_residual_filter_expr(plan).is_none()
789}
790
791const fn derive_semantic_filter_fully_satisfied_by_access_contract(
795 plan: &AccessPlannedQuery,
796) -> bool {
797 plan.scalar_plan().filter_expr.is_some()
798 && plan.scalar_plan().predicate.is_some()
799 && plan.scalar_plan().predicate_covers_filter_expr
800}
801
802const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
806 _model: &EntityModel,
807 plan: &AccessPlannedQuery,
808) -> bool {
809 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
810}
811
812fn compile_optional_predicate(
815 schema_info: &SchemaInfo,
816 predicate: Option<&Predicate>,
817) -> Option<PredicateProgram> {
818 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
819}
820
821const fn should_compile_execution_preparation_predicate(
826 residual_filter_expr: Option<&Expr>,
827 residual_filter_predicate: Option<&Predicate>,
828) -> bool {
829 residual_filter_expr.is_some() || residual_filter_predicate.is_some()
830}
831
832fn resolve_grouped_static_planning_semantics(
836 schema_info: &SchemaInfo,
837 plan: &AccessPlannedQuery,
838 projection_spec: &ProjectionSpec,
839) -> Result<
840 (
841 Option<Vec<GroupedAggregateExecutionSpec>>,
842 Option<GroupedDistinctExecutionStrategy>,
843 ),
844 InternalError,
845> {
846 let Some(grouped) = plan.grouped_plan() else {
847 return Ok((None, None));
848 };
849
850 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
851 projection_spec,
852 grouped.group.group_fields.as_slice(),
853 grouped.group.aggregates.as_slice(),
854 )?;
855 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
856
857 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
858 schema_info,
859 aggregate_specs.as_slice(),
860 )?);
861 let grouped_distinct_execution_strategy = Some(
862 resolved_grouped_distinct_execution_strategy_with_schema_info(
863 schema_info,
864 grouped.group.group_fields.as_slice(),
865 grouped.group.aggregates.as_slice(),
866 grouped.having_expr.as_ref(),
867 )?,
868 );
869
870 Ok((
871 grouped_aggregate_execution_specs,
872 grouped_distinct_execution_strategy,
873 ))
874}
875
876fn extend_grouped_having_aggregate_specs(
877 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
878 grouped: &GroupPlan,
879) -> Result<(), InternalError> {
880 if let Some(having_expr) = grouped.having_expr.as_ref() {
881 extend_unique_grouped_aggregate_specs_from_expr(aggregate_specs, having_expr)?;
882 }
883
884 Ok(())
885}
886
887fn projected_slot_mask_for_spec(
888 model: &EntityModel,
889 direct_projection_slots: Option<&[usize]>,
890) -> Vec<bool> {
891 let schema_slot_len = direct_projection_slots
892 .and_then(|slots| slots.iter().copied().max())
893 .map_or(0, |slot| slot.saturating_add(1));
894 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
895
896 let Some(direct_projection_slots) = direct_projection_slots else {
897 return projected_slots;
898 };
899
900 for slot in direct_projection_slots.iter().copied() {
901 if let Some(projected) = projected_slots.get_mut(slot) {
902 *projected = true;
903 }
904 }
905
906 projected_slots
907}
908
909fn resolved_order_for_plan(
910 schema_info: &SchemaInfo,
911 plan: &AccessPlannedQuery,
912) -> Result<Option<ResolvedOrder>, InternalError> {
913 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
914 return Ok(None);
915 }
916
917 let Some(order) = plan.scalar_plan().order.as_ref() else {
918 return Ok(None);
919 };
920
921 let mut fields = Vec::with_capacity(order.fields.len());
922 for term in &order.fields {
923 fields.push(ResolvedOrderField::new(
924 resolved_order_value_source_for_term(schema_info, term)?,
925 term.direction(),
926 ));
927 }
928
929 Ok(Some(ResolvedOrder::new(fields)))
930}
931
932fn resolved_order_value_source_for_term(
933 schema_info: &SchemaInfo,
934 term: &crate::db::query::plan::OrderTerm,
935) -> Result<ResolvedOrderValueSource, InternalError> {
936 if term.direct_field().is_none() {
937 let rendered = term.rendered_label();
938 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
939 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
940 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
941
942 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
943 &compiled,
944 )));
945 }
946
947 let field = term.direct_field().expect("query semantics invariant");
948 let slot = resolve_required_schema_slot(
949 schema_info,
950 field,
951 InternalError::query_invalid_logical_plan,
952 )?;
953
954 Ok(ResolvedOrderValueSource::direct_field(slot))
955}
956
957fn validate_resolved_order_expr_fields(
958 schema_info: &SchemaInfo,
959 expr: &Expr,
960 rendered: &str,
961) -> Result<(), InternalError> {
962 expr.try_for_each_tree_expr(&mut |node| match node {
963 Expr::Field(field_id) => resolve_required_schema_slot(
964 schema_info,
965 field_id.as_str(),
966 InternalError::query_invalid_logical_plan,
967 )
968 .map(|_| ()),
969 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
970 #[cfg(test)]
971 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
972 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
973 _ => Ok(()),
974 })
975}
976
977fn resolve_required_schema_slot<F>(
981 schema_info: &SchemaInfo,
982 field: &str,
983 invalid_plan_error: F,
984) -> Result<usize, InternalError>
985where
986 F: FnOnce() -> InternalError,
987{
988 schema_info
989 .field_slot_index(field)
990 .ok_or_else(invalid_plan_error)
991}
992
993fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
996 InternalError::query_invalid_logical_plan()
997}
998
999fn order_referenced_slots_for_resolved_order(
1004 resolved_order: Option<&ResolvedOrder>,
1005) -> Option<Vec<usize>> {
1006 Some(resolved_order?.referenced_slots())
1007}
1008
1009fn slot_map_for_schema_plan(
1010 schema_info: &SchemaInfo,
1011 plan: &AccessPlannedQuery,
1012) -> Option<Vec<usize>> {
1013 let executable = plan.access.executable_contract();
1014
1015 resolved_index_slots_for_access_path(schema_info, &executable)
1016}
1017
1018fn resolved_index_slots_for_access_path(
1019 schema_info: &SchemaInfo,
1020 access: &ExecutableAccessPlan<'_, crate::value::Value>,
1021) -> Option<Vec<usize>> {
1022 let path = access.as_path()?;
1023 let path_facts = path.shape_facts();
1024 let key_items = path_facts.index_key_items_for_slot_map()?;
1025 let mut slots = Vec::new();
1026
1027 match key_items.key_items() {
1028 SemanticIndexKeyItemsRef::Fields(fields) => {
1029 slots.reserve(fields.len());
1030 for field_name in fields {
1031 let slot = schema_info.field_slot_index(field_name)?;
1032 slots.push(slot);
1033 }
1034 }
1035 SemanticIndexKeyItemsRef::Accepted(items) => {
1036 slots.reserve(items.len());
1037 for key_item in items {
1038 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1039 slots.push(slot);
1040 }
1041 }
1042 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1043 slots.reserve(fields.len());
1044 for &field_name in fields {
1045 let slot = schema_info.field_slot_index(field_name)?;
1046 slots.push(slot);
1047 }
1048 }
1049 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1050 slots.reserve(items.len());
1051 for key_item in items {
1052 let slot = schema_info.field_slot_index(key_item.field())?;
1053 slots.push(slot);
1054 }
1055 }
1056 }
1057
1058 Some(slots)
1059}
1060
1061fn index_compile_targets_for_schema_plan(
1062 schema_info: &SchemaInfo,
1063 plan: &AccessPlannedQuery,
1064) -> Option<Vec<IndexCompileTarget>> {
1065 let executable = plan.access.executable_contract();
1066 let path = executable.as_path()?;
1067 let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1068 let mut targets = Vec::new();
1069
1070 match key_items.key_items() {
1071 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1072 return None;
1073 }
1074 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1075 for (component_index, &field_name) in fields.iter().enumerate() {
1076 let field_slot = schema_info.field_slot_index(field_name)?;
1077 targets.push(IndexCompileTarget {
1078 component_index,
1079 field_slot,
1080 key_item: IndexKeyItem::Field(field_name),
1081 });
1082 }
1083 }
1084 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1085 for (component_index, &key_item) in items.iter().enumerate() {
1086 let field_slot = schema_info.field_slot_index(key_item.field())?;
1087 targets.push(IndexCompileTarget {
1088 component_index,
1089 field_slot,
1090 key_item,
1091 });
1092 }
1093 }
1094 }
1095
1096 Some(targets)
1097}