1#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9 db::{
10 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11 predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12 query::plan::{
13 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16 LogicalPlan, PlannerRouteProfile, PredicatePushdownDiagnostics, QueryMode,
17 ResidualFilterContract, ResidualFilterShape, ResolvedOrder, ResolvedOrderField,
18 ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
19 derive_logical_pushdown_eligibility,
20 expr::{
21 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
22 compile_scalar_projection_plan_with_schema,
23 },
24 extend_unique_grouped_aggregate_specs_from_expr, grouped_aggregate_execution_specs,
25 grouped_aggregate_specs_from_projection_spec, grouped_cursor_policy_violation,
26 grouped_plan_strategy, lower_data_row_direct_projection_slots_with_schema,
27 lower_direct_projection_slots_with_schema, lower_projection_identity,
28 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
29 residual_query_predicate_after_filtered_access_contract,
30 resolved_grouped_distinct_execution_strategy_with_schema_info,
31 },
32 schema::SchemaInfo,
33 },
34 error::InternalError,
35 model::{
36 entity::EntityModel,
37 index::{IndexKeyItem, IndexKeyItemsRef},
38 },
39};
40
41impl QueryMode {
42 #[must_use]
44 pub const fn is_load(&self) -> bool {
45 match self {
46 Self::Load(_) => true,
47 Self::Delete(_) => false,
48 }
49 }
50
51 #[must_use]
53 pub const fn is_delete(&self) -> bool {
54 match self {
55 Self::Delete(_) => true,
56 Self::Load(_) => false,
57 }
58 }
59}
60
61impl LogicalPlan {
62 #[must_use]
64 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
65 match self {
66 Self::Scalar(plan) => plan,
67 Self::Grouped(plan) => &plan.scalar,
68 }
69 }
70
71 #[must_use]
73 #[cfg(test)]
74 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
75 match self {
76 Self::Scalar(plan) => plan,
77 Self::Grouped(plan) => &mut plan.scalar,
78 }
79 }
80
81 #[must_use]
83 #[cfg(test)]
84 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
85 self.scalar_semantics()
86 }
87
88 #[must_use]
90 #[cfg(test)]
91 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
92 self.scalar_semantics_mut()
93 }
94}
95
96impl AccessPlannedQuery {
97 #[must_use]
99 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
100 self.logical.scalar_semantics()
101 }
102
103 #[must_use]
106 #[cfg(any(test, feature = "sql"))]
107 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
108 self.scalar_plan().consistency
109 }
110
111 #[must_use]
113 #[cfg(test)]
114 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
115 self.logical.scalar_semantics_mut()
116 }
117
118 #[must_use]
120 #[cfg(test)]
121 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
122 self.scalar_plan()
123 }
124
125 #[must_use]
127 #[cfg(test)]
128 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
129 self.scalar_plan_mut()
130 }
131
132 #[must_use]
134 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
135 match &self.logical {
136 LogicalPlan::Scalar(_) => None,
137 LogicalPlan::Grouped(plan) => Some(plan),
138 }
139 }
140
141 #[must_use]
143 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
144 if let Some(static_contract) = &self.static_execution_planning_contract {
145 return static_contract.projection_spec.clone();
146 }
147
148 lower_projection_intent(model, &self.logical, &self.projection_selection)
149 }
150
151 #[must_use]
153 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
154 lower_projection_identity(&self.logical, &self.projection_selection)
155 }
156
157 #[must_use]
163 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
164 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
165 return static_contract.execution_preparation_predicate.clone();
166 }
167
168 derive_execution_preparation_predicate(self)
169 }
170
171 #[must_use]
175 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
176 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
177 return static_contract
178 .residual_filter_contract
179 .residual_filter_predicate()
180 .cloned();
181 }
182
183 derive_residual_filter_predicate(self)
184 }
185
186 #[must_use]
189 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
190 self.effective_execution_predicate().is_some()
191 }
192
193 #[must_use]
196 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
197 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
198 return static_contract
199 .residual_filter_contract
200 .residual_filter_expr();
201 }
202
203 if !derive_has_residual_filter(self) {
204 return None;
205 }
206
207 self.scalar_plan().filter_expr.as_ref()
208 }
209
210 #[must_use]
213 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
214 self.residual_filter_expr().is_some()
215 }
216
217 #[must_use]
219 pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
220 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
221 return static_contract.residual_filter_contract.shape();
222 }
223
224 ResidualFilterShape::from_presence(
225 self.residual_filter_expr().is_some(),
226 self.effective_execution_predicate().is_some(),
227 )
228 }
229
230 #[must_use]
233 pub(in crate::db) fn predicate_pushdown_label(&self) -> String {
234 self.predicate_pushdown_diagnostics().label()
235 }
236
237 #[must_use]
239 pub(in crate::db) fn predicate_pushdown_diagnostics(&self) -> PredicatePushdownDiagnostics {
240 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
241 return static_contract.predicate_pushdown_diagnostics;
242 }
243
244 derive_predicate_pushdown_diagnostics(self, self.residual_filter_shape())
245 }
246
247 #[must_use]
249 pub(in crate::db) fn predicate_pushdown_outcome_label(&self) -> &'static str {
250 self.predicate_pushdown_diagnostics().outcome_label()
251 }
252
253 #[must_use]
255 pub(in crate::db) fn predicate_pushdown_reason_label(&self) -> &'static str {
256 self.predicate_pushdown_diagnostics().reason_label()
257 }
258
259 #[must_use]
261 pub(in crate::db) fn execution_preparation_compiled_predicate(
262 &self,
263 ) -> Option<&PredicateProgram> {
264 self.static_execution_planning_contract()?
265 .execution_preparation_compiled_predicate
266 .as_ref()
267 }
268
269 #[must_use]
271 pub(in crate::db) fn effective_runtime_compiled_predicate(&self) -> Option<&PredicateProgram> {
272 match self
273 .static_execution_planning_contract()?
274 .residual_filter_contract
275 .effective_runtime_filter_program()
276 {
277 Some(program) => program.predicate_program(),
278 None => None,
279 }
280 }
281
282 #[cfg(test)]
284 #[must_use]
285 pub(in crate::db) fn effective_runtime_compiled_filter_expr(&self) -> Option<&CompiledExpr> {
286 match self
287 .static_execution_planning_contract()?
288 .residual_filter_contract
289 .effective_runtime_filter_program()
290 {
291 Some(program) => program.expression_filter(),
292 None => None,
293 }
294 }
295
296 #[must_use]
298 pub(in crate::db) fn effective_runtime_filter_program(
299 &self,
300 ) -> Option<&EffectiveRuntimeFilterProgram> {
301 self.static_execution_planning_contract()?
302 .residual_filter_contract
303 .effective_runtime_filter_program()
304 }
305
306 #[must_use]
308 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
309 if !self.scalar_plan().distinct {
310 return DistinctExecutionStrategy::None;
311 }
312
313 match distinct_runtime_dedup_strategy(&self.access) {
317 Some(strategy) => strategy,
318 None => DistinctExecutionStrategy::None,
319 }
320 }
321
322 #[cfg(test)]
324 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
325 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
326 }
327
328 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
330 &mut self,
331 schema_info: &SchemaInfo,
332 ) {
333 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
334 }
335
336 #[cfg(test)]
338 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
339 &mut self,
340 model: &EntityModel,
341 ) -> Result<(), InternalError> {
342 self.finalize_static_execution_planning_contract_for_model_with_schema(
343 model,
344 SchemaInfo::cached_for_generated_entity_model(model),
345 )
346 }
347
348 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
350 &mut self,
351 model: &EntityModel,
352 schema_info: &SchemaInfo,
353 ) -> Result<(), InternalError> {
354 self.static_execution_planning_contract = Some(
355 project_static_execution_planning_contract_for_model(model, schema_info, self)?,
356 );
357
358 Ok(())
359 }
360
361 #[must_use]
363 pub(in crate::db) fn execution_shape_signature(
364 &self,
365 entity_path: &'static str,
366 ) -> ExecutionShapeSignature {
367 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
368 }
369
370 #[must_use]
373 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
374 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
375 return self.scalar_plan().predicate.is_some()
376 && !static_contract
377 .residual_filter_contract
378 .has_residual_filter();
379 }
380
381 derive_predicate_fully_satisfied_by_access_contract(self)
382 }
383
384 #[must_use]
386 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
387 self.static_execution_planning_contract()?
388 .scalar_projection_plan
389 .as_deref()
390 }
391
392 #[must_use]
394 pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
395 self.static_execution_planning_contract.is_some()
396 }
397
398 pub(in crate::db) fn primary_key_names(&self) -> Result<Vec<&str>, InternalError> {
400 Ok(self
401 .require_static_execution_planning_contract()?
402 .primary_key_names
403 .iter()
404 .map(String::as_str)
405 .collect())
406 }
407
408 pub(in crate::db) fn projection_referenced_slots(&self) -> Result<&[usize], InternalError> {
410 Ok(self
411 .require_static_execution_planning_contract()?
412 .projection_referenced_slots
413 .as_slice())
414 }
415
416 #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
418 pub(in crate::db) fn projected_slot_mask(&self) -> Result<&[bool], InternalError> {
419 Ok(self
420 .require_static_execution_planning_contract()?
421 .projected_slot_mask
422 .as_slice())
423 }
424
425 pub(in crate::db) fn projection_is_model_identity(&self) -> Result<bool, InternalError> {
427 Ok(self
428 .require_static_execution_planning_contract()?
429 .projection_is_model_identity)
430 }
431
432 #[must_use]
434 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
435 self.static_execution_planning_contract()?
436 .order_referenced_slots
437 .as_deref()
438 }
439
440 #[must_use]
442 pub(in crate::db) fn resolved_order(&self) -> Option<&ResolvedOrder> {
443 self.static_execution_planning_contract()?
444 .resolved_order
445 .as_ref()
446 }
447
448 #[must_use]
450 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
451 self.static_execution_planning_contract()?
452 .slot_map
453 .as_deref()
454 }
455
456 #[must_use]
458 pub(in crate::db) fn grouped_aggregate_execution_specs(
459 &self,
460 ) -> Option<&[GroupedAggregateExecutionSpec]> {
461 self.static_execution_planning_contract()?
462 .grouped_aggregate_execution_specs
463 .as_deref()
464 }
465
466 #[must_use]
468 pub(in crate::db) fn grouped_distinct_execution_strategy(
469 &self,
470 ) -> Option<&GroupedDistinctExecutionStrategy> {
471 self.static_execution_planning_contract()?
472 .grouped_distinct_execution_strategy
473 .as_ref()
474 }
475
476 pub(in crate::db) fn frozen_projection_spec(&self) -> Result<&ProjectionSpec, InternalError> {
478 Ok(&self
479 .require_static_execution_planning_contract()?
480 .projection_spec)
481 }
482
483 #[must_use]
485 #[cfg(any(test, feature = "sql"))]
486 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
487 self.static_execution_planning_contract()?
488 .projection_direct_slots
489 .as_deref()
490 }
491
492 #[must_use]
494 #[cfg(any(test, feature = "sql"))]
495 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
496 self.static_execution_planning_contract()?
497 .projection_data_row_direct_slots
498 .as_deref()
499 }
500
501 #[must_use]
503 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
504 self.static_execution_planning_contract()?
505 .index_compile_targets
506 .as_deref()
507 }
508
509 const fn static_execution_planning_contract(&self) -> Option<&StaticExecutionPlanningContract> {
510 self.static_execution_planning_contract.as_ref()
511 }
512
513 fn require_static_execution_planning_contract(
514 &self,
515 ) -> Result<&StaticExecutionPlanningContract, InternalError> {
516 self.static_execution_planning_contract
517 .as_ref()
518 .ok_or_else(InternalError::query_executor_invariant)
519 }
520}
521
522fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
523 match access {
524 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
525 Some(DistinctExecutionStrategy::PreOrdered)
526 }
527 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
528 Some(DistinctExecutionStrategy::HashMaterialize)
529 }
530 AccessPlan::Path(_) => None,
531 }
532}
533
534fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
535 let is_grouped_safe = plan
536 .grouped_plan()
537 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
538
539 ContinuationPolicy::new(
540 true, true, is_grouped_safe,
543 )
544}
545
546#[must_use]
548#[cfg(test)]
549pub(in crate::db) fn project_planner_route_profile_for_model(
550 model: &EntityModel,
551 plan: &AccessPlannedQuery,
552) -> PlannerRouteProfile {
553 let primary_key_names = ordered_primary_key_names(model);
554 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
555 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
556 });
557
558 PlannerRouteProfile::new(
559 derive_continuation_policy_validated(plan),
560 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
561 secondary_order_contract,
562 )
563}
564
565#[must_use]
567pub(in crate::db) fn project_planner_route_profile_for_schema(
568 schema_info: &SchemaInfo,
569 plan: &AccessPlannedQuery,
570) -> PlannerRouteProfile {
571 let primary_key_names = primary_key_names_from_schema(schema_info);
572 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
573 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
574 });
575
576 PlannerRouteProfile::new(
577 derive_continuation_policy_validated(plan),
578 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
579 secondary_order_contract,
580 )
581}
582
583fn project_static_execution_planning_contract_for_model(
584 model: &EntityModel,
585 schema_info: &SchemaInfo,
586 plan: &AccessPlannedQuery,
587) -> Result<StaticExecutionPlanningContract, InternalError> {
588 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
589 let execution_preparation_predicate = plan.execution_preparation_predicate();
590 let residual_filter_predicate = derive_residual_filter_predicate_from_preparation(
591 plan,
592 execution_preparation_predicate.as_ref(),
593 );
594 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
595 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
596 schema_info,
597 residual_filter_expr.as_ref(),
598 residual_filter_predicate.as_ref(),
599 )?;
600 let residual_filter_contract = ResidualFilterContract::new(
601 residual_filter_expr,
602 residual_filter_predicate,
603 effective_runtime_filter_program,
604 );
605 let residual_filter_shape = residual_filter_contract.shape();
606 let execution_preparation_compiled_predicate =
607 should_compile_execution_preparation_predicate(residual_filter_shape)
608 .then(|| {
609 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref())
610 })
611 .flatten();
612 let predicate_pushdown_diagnostics =
613 derive_predicate_pushdown_diagnostics(plan, residual_filter_shape);
614 let scalar_projection_plan = if plan.grouped_plan().is_none() {
615 Some(
616 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
617 .ok_or_else(InternalError::query_executor_invariant)?
618 .iter()
619 .map(CompiledExpr::compile)
620 .collect(),
621 )
622 } else {
623 None
624 };
625 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
626 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
627 let projection_direct_slots = lower_direct_projection_slots_with_schema(
628 model,
629 schema_info,
630 &plan.logical,
631 &plan.projection_selection,
632 );
633 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
634 model,
635 schema_info,
636 &plan.logical,
637 &plan.projection_selection,
638 );
639 let projection_referenced_slots =
640 projection_spec.referenced_slots_for_schema(model, schema_info)?;
641 let projected_slot_mask =
642 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
643 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
644 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
645 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
646 let slot_map = slot_map_for_schema_plan(schema_info, plan);
647 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
648
649 Ok(StaticExecutionPlanningContract {
650 primary_key_names: schema_info.primary_key_names().to_vec(),
651 projection_spec,
652 execution_preparation_predicate,
653 execution_preparation_compiled_predicate,
654 residual_filter_contract,
655 predicate_pushdown_diagnostics,
656 scalar_projection_plan,
657 grouped_aggregate_execution_specs,
658 grouped_distinct_execution_strategy,
659 projection_direct_slots,
660 projection_data_row_direct_slots,
661 projection_referenced_slots,
662 projected_slot_mask,
663 projection_is_model_identity,
664 resolved_order,
665 order_referenced_slots,
666 slot_map,
667 index_compile_targets,
668 })
669}
670
671#[cfg(test)]
672fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
673 model.primary_key_names()
674}
675
676fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
677 schema_info
678 .primary_key_names()
679 .iter()
680 .map(String::as_str)
681 .collect()
682}
683
684fn compile_effective_runtime_filter_program(
688 schema_info: &SchemaInfo,
689 residual_filter_expr: Option<&Expr>,
690 residual_filter_predicate: Option<&Predicate>,
691) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
692 if let Some(predicate) = residual_filter_predicate {
697 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
698 PredicateProgram::compile_with_schema_info(schema_info, predicate),
699 )));
700 }
701
702 if let Some(filter_expr) = residual_filter_expr {
703 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
704 .ok_or_else(InternalError::query_invalid_logical_plan)?;
705
706 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
707 CompiledExpr::compile(&compiled),
708 )));
709 }
710
711 Ok(None)
712}
713
714fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
718 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
719
720 match plan.access.selected_index_contract() {
721 Some(index) => {
722 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
723 }
724 None => Some(query_predicate.clone()),
725 }
726}
727
728fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
732 let filtered_residual = derive_execution_preparation_predicate(plan);
733
734 derive_residual_filter_predicate_from_preparation(plan, filtered_residual.as_ref())
735}
736
737fn derive_residual_filter_predicate_from_preparation(
738 plan: &AccessPlannedQuery,
739 execution_preparation_predicate: Option<&Predicate>,
740) -> Option<Predicate> {
741 let execution_preparation_predicate = execution_preparation_predicate?;
742
743 residual_query_predicate_after_access_path_bounds(
744 plan.access.as_path(),
745 execution_preparation_predicate,
746 )
747}
748
749fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
753 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
754 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
755 return None;
756 }
757
758 Some(filter_expr.clone())
759}
760
761fn derive_residual_filter_expr_for_model(
765 model: &EntityModel,
766 plan: &AccessPlannedQuery,
767) -> Option<Expr> {
768 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
769 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
770 return None;
771 }
772
773 Some(filter_expr.clone())
774}
775
776fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
780 match (
781 plan.scalar_plan().filter_expr.as_ref(),
782 plan.scalar_plan().predicate.as_ref(),
783 ) {
784 (None, None) => false,
785 (Some(_), None) => true,
786 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
787 }
788}
789
790fn derive_predicate_pushdown_diagnostics(
794 plan: &AccessPlannedQuery,
795 residual_filter_shape: ResidualFilterShape,
796) -> PredicatePushdownDiagnostics {
797 PredicatePushdownDiagnostics::from_plan(
798 plan.scalar_plan().filter_expr.is_some(),
799 plan.scalar_plan().predicate_covers_filter_expr,
800 plan.scalar_plan().predicate.as_ref(),
801 &plan.access,
802 residual_filter_shape,
803 )
804}
805
806fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
809 plan.scalar_plan().predicate.is_some()
810 && derive_residual_filter_predicate(plan).is_none()
811 && derive_residual_filter_expr(plan).is_none()
812}
813
814const fn derive_semantic_filter_fully_satisfied_by_access_contract(
818 plan: &AccessPlannedQuery,
819) -> bool {
820 plan.scalar_plan().filter_expr.is_some()
821 && plan.scalar_plan().predicate.is_some()
822 && plan.scalar_plan().predicate_covers_filter_expr
823}
824
825const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
829 _model: &EntityModel,
830 plan: &AccessPlannedQuery,
831) -> bool {
832 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
833}
834
835fn compile_optional_predicate(
838 schema_info: &SchemaInfo,
839 predicate: Option<&Predicate>,
840) -> Option<PredicateProgram> {
841 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
842}
843
844const fn should_compile_execution_preparation_predicate(
849 residual_filter_shape: ResidualFilterShape,
850) -> bool {
851 !residual_filter_shape.is_absent()
852}
853
854fn resolve_grouped_static_planning_semantics(
858 schema_info: &SchemaInfo,
859 plan: &AccessPlannedQuery,
860 projection_spec: &ProjectionSpec,
861) -> Result<
862 (
863 Option<Vec<GroupedAggregateExecutionSpec>>,
864 Option<GroupedDistinctExecutionStrategy>,
865 ),
866 InternalError,
867> {
868 let Some(grouped) = plan.grouped_plan() else {
869 return Ok((None, None));
870 };
871
872 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
873 projection_spec,
874 grouped.group.group_fields.as_slice(),
875 grouped.group.aggregates.as_slice(),
876 )?;
877 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
878
879 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
880 schema_info,
881 aggregate_specs.as_slice(),
882 )?);
883 let grouped_distinct_execution_strategy = Some(
884 resolved_grouped_distinct_execution_strategy_with_schema_info(
885 schema_info,
886 grouped.group.group_fields.as_slice(),
887 grouped.group.aggregates.as_slice(),
888 grouped.having_expr.as_ref(),
889 )?,
890 );
891
892 Ok((
893 grouped_aggregate_execution_specs,
894 grouped_distinct_execution_strategy,
895 ))
896}
897
898fn extend_grouped_having_aggregate_specs(
899 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
900 grouped: &GroupPlan,
901) -> Result<(), InternalError> {
902 if let Some(having_expr) = grouped.having_expr.as_ref() {
903 extend_unique_grouped_aggregate_specs_from_expr(aggregate_specs, having_expr)?;
904 }
905
906 Ok(())
907}
908
909fn projected_slot_mask_for_spec(
910 model: &EntityModel,
911 direct_projection_slots: Option<&[usize]>,
912) -> Vec<bool> {
913 let schema_slot_len = direct_projection_slots
914 .and_then(|slots| slots.iter().copied().max())
915 .map_or(0, |slot| slot.saturating_add(1));
916 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
917
918 let Some(direct_projection_slots) = direct_projection_slots else {
919 return projected_slots;
920 };
921
922 for slot in direct_projection_slots.iter().copied() {
923 if let Some(projected) = projected_slots.get_mut(slot) {
924 *projected = true;
925 }
926 }
927
928 projected_slots
929}
930
931fn resolved_order_for_plan(
932 schema_info: &SchemaInfo,
933 plan: &AccessPlannedQuery,
934) -> Result<Option<ResolvedOrder>, InternalError> {
935 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
936 return Ok(None);
937 }
938
939 let Some(order) = plan.scalar_plan().order.as_ref() else {
940 return Ok(None);
941 };
942
943 let mut fields = Vec::with_capacity(order.fields.len());
944 for term in &order.fields {
945 fields.push(ResolvedOrderField::new(
946 resolved_order_value_source_for_term(schema_info, term)?,
947 term.direction(),
948 ));
949 }
950
951 Ok(Some(ResolvedOrder::new(fields)))
952}
953
954fn resolved_order_value_source_for_term(
955 schema_info: &SchemaInfo,
956 term: &crate::db::query::plan::OrderTerm,
957) -> Result<ResolvedOrderValueSource, InternalError> {
958 if term.direct_field().is_none() {
959 let rendered = term.rendered_label();
960 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
961 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
962 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
963
964 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
965 &compiled,
966 )));
967 }
968
969 let Some(field) = term.direct_field() else {
970 return Err(InternalError::query_invalid_logical_plan());
971 };
972 let slot = resolve_required_schema_slot(
973 schema_info,
974 field,
975 InternalError::query_invalid_logical_plan,
976 )?;
977
978 Ok(ResolvedOrderValueSource::direct_field(slot))
979}
980
981fn validate_resolved_order_expr_fields(
982 schema_info: &SchemaInfo,
983 expr: &Expr,
984 rendered: &str,
985) -> Result<(), InternalError> {
986 expr.try_for_each_tree_expr(&mut |node| match node {
987 Expr::Field(field_id) => resolve_required_schema_slot(
988 schema_info,
989 field_id.as_str(),
990 InternalError::query_invalid_logical_plan,
991 )
992 .map(|_| ()),
993 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
994 #[cfg(test)]
995 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
996 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
997 _ => Ok(()),
998 })
999}
1000
1001fn resolve_required_schema_slot<F>(
1005 schema_info: &SchemaInfo,
1006 field: &str,
1007 invalid_plan_error: F,
1008) -> Result<usize, InternalError>
1009where
1010 F: FnOnce() -> InternalError,
1011{
1012 schema_info
1013 .field_slot_index(field)
1014 .ok_or_else(invalid_plan_error)
1015}
1016
1017fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
1020 InternalError::query_invalid_logical_plan()
1021}
1022
1023fn order_referenced_slots_for_resolved_order(
1028 resolved_order: Option<&ResolvedOrder>,
1029) -> Option<Vec<usize>> {
1030 Some(resolved_order?.referenced_slots())
1031}
1032
1033fn slot_map_for_schema_plan(
1034 schema_info: &SchemaInfo,
1035 plan: &AccessPlannedQuery,
1036) -> Option<Vec<usize>> {
1037 let executable = plan.access.executable_contract();
1038
1039 resolved_index_slots_for_access_path(schema_info, &executable)
1040}
1041
1042fn resolved_index_slots_for_access_path(
1043 schema_info: &SchemaInfo,
1044 access: &ExecutableAccessPlan<'_, crate::value::Value>,
1045) -> Option<Vec<usize>> {
1046 let path = access.as_path()?;
1047 let path_facts = path.shape_facts();
1048 let key_items = path_facts.index_key_items_for_slot_map()?;
1049 let mut slots = Vec::new();
1050
1051 match key_items.key_items() {
1052 SemanticIndexKeyItemsRef::Fields(fields) => {
1053 slots.reserve(fields.len());
1054 for field_name in fields {
1055 let slot = schema_info.field_slot_index(field_name)?;
1056 slots.push(slot);
1057 }
1058 }
1059 SemanticIndexKeyItemsRef::Accepted(items) => {
1060 slots.reserve(items.len());
1061 for key_item in items {
1062 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1063 slots.push(slot);
1064 }
1065 }
1066 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1067 slots.reserve(fields.len());
1068 for &field_name in fields {
1069 let slot = schema_info.field_slot_index(field_name)?;
1070 slots.push(slot);
1071 }
1072 }
1073 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1074 slots.reserve(items.len());
1075 for key_item in items {
1076 let slot = schema_info.field_slot_index(key_item.field())?;
1077 slots.push(slot);
1078 }
1079 }
1080 }
1081
1082 Some(slots)
1083}
1084
1085fn index_compile_targets_for_schema_plan(
1086 schema_info: &SchemaInfo,
1087 plan: &AccessPlannedQuery,
1088) -> Option<Vec<IndexCompileTarget>> {
1089 let executable = plan.access.executable_contract();
1090 let path = executable.as_path()?;
1091 let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1092 let mut targets = Vec::new();
1093
1094 match key_items.key_items() {
1095 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1096 return None;
1097 }
1098 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1099 for (component_index, &field_name) in fields.iter().enumerate() {
1100 let field_slot = schema_info.field_slot_index(field_name)?;
1101 targets.push(IndexCompileTarget {
1102 component_index,
1103 field_slot,
1104 key_item: IndexKeyItem::Field(field_name),
1105 });
1106 }
1107 }
1108 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1109 for (component_index, &key_item) in items.iter().enumerate() {
1110 let field_slot = schema_info.field_slot_index(key_item.field())?;
1111 targets.push(IndexCompileTarget {
1112 component_index,
1113 field_slot,
1114 key_item,
1115 });
1116 }
1117 }
1118 }
1119
1120 Some(targets)
1121}