1#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9 db::{
10 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11 predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12 query::plan::{
13 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16 LogicalPlan, PlannerRouteProfile, QueryMode, ResidualFilterContract,
17 ResidualFilterShape, ResolvedOrder, ResolvedOrderField, ResolvedOrderValueSource,
18 ScalarPlan, StaticExecutionPlanningContract, derive_logical_pushdown_eligibility,
19 expr::{
20 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
21 compile_scalar_projection_plan_with_schema,
22 },
23 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24 grouped_cursor_policy_violation, grouped_plan_strategy,
25 lower_data_row_direct_projection_slots_with_schema,
26 lower_direct_projection_slots_with_schema, lower_projection_identity,
27 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
28 residual_query_predicate_after_filtered_access_contract,
29 resolved_grouped_distinct_execution_strategy_with_schema_info,
30 },
31 schema::SchemaInfo,
32 },
33 error::InternalError,
34 model::{
35 entity::EntityModel,
36 index::{IndexKeyItem, IndexKeyItemsRef},
37 },
38};
39
40impl QueryMode {
41 #[must_use]
43 pub const fn is_load(&self) -> bool {
44 match self {
45 Self::Load(_) => true,
46 Self::Delete(_) => false,
47 }
48 }
49
50 #[must_use]
52 pub const fn is_delete(&self) -> bool {
53 match self {
54 Self::Delete(_) => true,
55 Self::Load(_) => false,
56 }
57 }
58}
59
60impl LogicalPlan {
61 #[must_use]
63 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
64 match self {
65 Self::Scalar(plan) => plan,
66 Self::Grouped(plan) => &plan.scalar,
67 }
68 }
69
70 #[must_use]
72 #[cfg(test)]
73 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
74 match self {
75 Self::Scalar(plan) => plan,
76 Self::Grouped(plan) => &mut plan.scalar,
77 }
78 }
79
80 #[must_use]
82 #[cfg(test)]
83 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
84 self.scalar_semantics()
85 }
86
87 #[must_use]
89 #[cfg(test)]
90 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
91 self.scalar_semantics_mut()
92 }
93}
94
95impl AccessPlannedQuery {
96 #[must_use]
98 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
99 self.logical.scalar_semantics()
100 }
101
102 #[must_use]
105 #[cfg(any(test, feature = "sql"))]
106 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
107 self.scalar_plan().consistency
108 }
109
110 #[must_use]
112 #[cfg(test)]
113 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
114 self.logical.scalar_semantics_mut()
115 }
116
117 #[must_use]
119 #[cfg(test)]
120 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
121 self.scalar_plan()
122 }
123
124 #[must_use]
126 #[cfg(test)]
127 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
128 self.scalar_plan_mut()
129 }
130
131 #[must_use]
133 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
134 match &self.logical {
135 LogicalPlan::Scalar(_) => None,
136 LogicalPlan::Grouped(plan) => Some(plan),
137 }
138 }
139
140 #[must_use]
142 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
143 if let Some(static_contract) = &self.static_execution_planning_contract {
144 return static_contract.projection_spec.clone();
145 }
146
147 lower_projection_intent(model, &self.logical, &self.projection_selection)
148 }
149
150 #[must_use]
152 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
153 lower_projection_identity(&self.logical, &self.projection_selection)
154 }
155
156 #[must_use]
162 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
163 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
164 return static_contract.execution_preparation_predicate.clone();
165 }
166
167 derive_execution_preparation_predicate(self)
168 }
169
170 #[must_use]
174 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
175 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
176 return static_contract
177 .residual_filter_contract
178 .residual_filter_predicate()
179 .cloned();
180 }
181
182 derive_residual_filter_predicate(self)
183 }
184
185 #[must_use]
188 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
189 self.effective_execution_predicate().is_some()
190 }
191
192 #[must_use]
195 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
196 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
197 return static_contract
198 .residual_filter_contract
199 .residual_filter_expr();
200 }
201
202 if !derive_has_residual_filter(self) {
203 return None;
204 }
205
206 self.scalar_plan().filter_expr.as_ref()
207 }
208
209 #[must_use]
212 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
213 self.residual_filter_expr().is_some()
214 }
215
216 #[must_use]
218 pub(in crate::db) fn residual_filter_shape(&self) -> ResidualFilterShape {
219 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
220 return static_contract.residual_filter_contract.shape();
221 }
222
223 ResidualFilterShape::from_presence(
224 self.residual_filter_expr().is_some(),
225 self.effective_execution_predicate().is_some(),
226 )
227 }
228
229 #[must_use]
231 pub(in crate::db) const fn execution_preparation_compiled_predicate(
232 &self,
233 ) -> Option<&PredicateProgram> {
234 self.static_execution_planning_contract()
235 .execution_preparation_compiled_predicate
236 .as_ref()
237 }
238
239 #[must_use]
241 pub(in crate::db) const fn effective_runtime_compiled_predicate(
242 &self,
243 ) -> Option<&PredicateProgram> {
244 match self
245 .static_execution_planning_contract()
246 .residual_filter_contract
247 .effective_runtime_filter_program()
248 {
249 Some(program) => program.predicate_program(),
250 None => None,
251 }
252 }
253
254 #[cfg(test)]
256 #[must_use]
257 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
258 &self,
259 ) -> Option<&CompiledExpr> {
260 match self
261 .static_execution_planning_contract()
262 .residual_filter_contract
263 .effective_runtime_filter_program()
264 {
265 Some(program) => program.expression_filter(),
266 None => None,
267 }
268 }
269
270 #[must_use]
272 pub(in crate::db) const fn effective_runtime_filter_program(
273 &self,
274 ) -> Option<&EffectiveRuntimeFilterProgram> {
275 self.static_execution_planning_contract()
276 .residual_filter_contract
277 .effective_runtime_filter_program()
278 }
279
280 #[must_use]
282 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
283 if !self.scalar_plan().distinct {
284 return DistinctExecutionStrategy::None;
285 }
286
287 match distinct_runtime_dedup_strategy(&self.access) {
291 Some(strategy) => strategy,
292 None => DistinctExecutionStrategy::None,
293 }
294 }
295
296 #[cfg(test)]
298 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
299 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
300 }
301
302 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
304 &mut self,
305 schema_info: &SchemaInfo,
306 ) {
307 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
308 }
309
310 #[cfg(test)]
312 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
313 &mut self,
314 model: &EntityModel,
315 ) -> Result<(), InternalError> {
316 self.finalize_static_execution_planning_contract_for_model_with_schema(
317 model,
318 SchemaInfo::cached_for_generated_entity_model(model),
319 )
320 }
321
322 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
324 &mut self,
325 model: &EntityModel,
326 schema_info: &SchemaInfo,
327 ) -> Result<(), InternalError> {
328 self.static_execution_planning_contract = Some(
329 project_static_execution_planning_contract_for_model(model, schema_info, self)?,
330 );
331
332 Ok(())
333 }
334
335 #[must_use]
337 pub(in crate::db) fn execution_shape_signature(
338 &self,
339 entity_path: &'static str,
340 ) -> ExecutionShapeSignature {
341 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
342 }
343
344 #[must_use]
347 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
348 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
349 return self.scalar_plan().predicate.is_some()
350 && !static_contract
351 .residual_filter_contract
352 .has_residual_filter();
353 }
354
355 derive_predicate_fully_satisfied_by_access_contract(self)
356 }
357
358 #[must_use]
360 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
361 self.static_execution_planning_contract()
362 .scalar_projection_plan
363 .as_deref()
364 }
365
366 #[must_use]
368 pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
369 self.static_execution_planning_contract.is_some()
370 }
371
372 #[must_use]
374 pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
375 self.static_execution_planning_contract()
376 .primary_key_names
377 .iter()
378 .map(String::as_str)
379 .collect()
380 }
381
382 #[must_use]
384 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
385 self.static_execution_planning_contract()
386 .projection_referenced_slots
387 .as_slice()
388 }
389
390 #[must_use]
392 #[cfg(any(test, all(feature = "sql", feature = "diagnostics")))]
393 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
394 self.static_execution_planning_contract()
395 .projected_slot_mask
396 .as_slice()
397 }
398
399 #[must_use]
401 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
402 self.static_execution_planning_contract()
403 .projection_is_model_identity
404 }
405
406 #[must_use]
408 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
409 self.static_execution_planning_contract()
410 .order_referenced_slots
411 .as_deref()
412 }
413
414 #[must_use]
416 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
417 self.static_execution_planning_contract()
418 .resolved_order
419 .as_ref()
420 }
421
422 #[must_use]
424 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
425 self.static_execution_planning_contract()
426 .slot_map
427 .as_deref()
428 }
429
430 #[must_use]
432 pub(in crate::db) fn grouped_aggregate_execution_specs(
433 &self,
434 ) -> Option<&[GroupedAggregateExecutionSpec]> {
435 self.static_execution_planning_contract()
436 .grouped_aggregate_execution_specs
437 .as_deref()
438 }
439
440 #[must_use]
442 pub(in crate::db) const fn grouped_distinct_execution_strategy(
443 &self,
444 ) -> Option<&GroupedDistinctExecutionStrategy> {
445 self.static_execution_planning_contract()
446 .grouped_distinct_execution_strategy
447 .as_ref()
448 }
449
450 #[must_use]
452 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
453 &self.static_execution_planning_contract().projection_spec
454 }
455
456 #[must_use]
458 #[cfg(any(test, feature = "sql"))]
459 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
460 self.static_execution_planning_contract()
461 .projection_direct_slots
462 .as_deref()
463 }
464
465 #[must_use]
467 #[cfg(any(test, feature = "sql"))]
468 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
469 self.static_execution_planning_contract()
470 .projection_data_row_direct_slots
471 .as_deref()
472 }
473
474 #[must_use]
476 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
477 self.static_execution_planning_contract()
478 .index_compile_targets
479 .as_deref()
480 }
481
482 const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
483 self.static_execution_planning_contract
484 .as_ref()
485 .expect("query semantics invariant")
486 }
487}
488
489fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
490 match access {
491 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
492 Some(DistinctExecutionStrategy::PreOrdered)
493 }
494 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
495 Some(DistinctExecutionStrategy::HashMaterialize)
496 }
497 AccessPlan::Path(_) => None,
498 }
499}
500
501fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
502 let is_grouped_safe = plan
503 .grouped_plan()
504 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
505
506 ContinuationPolicy::new(
507 true, true, is_grouped_safe,
510 )
511}
512
513#[must_use]
515#[cfg(test)]
516pub(in crate::db) fn project_planner_route_profile_for_model(
517 model: &EntityModel,
518 plan: &AccessPlannedQuery,
519) -> PlannerRouteProfile {
520 let primary_key_names = ordered_primary_key_names(model);
521 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
522 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
523 });
524
525 PlannerRouteProfile::new(
526 derive_continuation_policy_validated(plan),
527 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
528 secondary_order_contract,
529 )
530}
531
532#[must_use]
534pub(in crate::db) fn project_planner_route_profile_for_schema(
535 schema_info: &SchemaInfo,
536 plan: &AccessPlannedQuery,
537) -> PlannerRouteProfile {
538 let primary_key_names = primary_key_names_from_schema(schema_info);
539 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
540 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
541 });
542
543 PlannerRouteProfile::new(
544 derive_continuation_policy_validated(plan),
545 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
546 secondary_order_contract,
547 )
548}
549
550fn project_static_execution_planning_contract_for_model(
551 model: &EntityModel,
552 schema_info: &SchemaInfo,
553 plan: &AccessPlannedQuery,
554) -> Result<StaticExecutionPlanningContract, InternalError> {
555 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
556 let execution_preparation_predicate = plan.execution_preparation_predicate();
557 let residual_filter_predicate = derive_residual_filter_predicate(plan);
558 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
559 let execution_preparation_compiled_predicate =
560 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
561 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
562 schema_info,
563 residual_filter_expr.as_ref(),
564 residual_filter_predicate.as_ref(),
565 )?;
566 let residual_filter_contract = ResidualFilterContract::new(
567 residual_filter_expr,
568 residual_filter_predicate,
569 effective_runtime_filter_program,
570 );
571 let scalar_projection_plan = if plan.grouped_plan().is_none() {
572 Some(
573 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
574 .ok_or_else(InternalError::query_executor_invariant)?
575 .iter()
576 .map(CompiledExpr::compile)
577 .collect(),
578 )
579 } else {
580 None
581 };
582 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
583 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
584 let projection_direct_slots = lower_direct_projection_slots_with_schema(
585 model,
586 schema_info,
587 &plan.logical,
588 &plan.projection_selection,
589 );
590 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
591 model,
592 schema_info,
593 &plan.logical,
594 &plan.projection_selection,
595 );
596 let projection_referenced_slots =
597 projection_spec.referenced_slots_for_schema(model, schema_info)?;
598 let projected_slot_mask =
599 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
600 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
601 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
602 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
603 let slot_map = slot_map_for_schema_plan(schema_info, plan);
604 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
605
606 Ok(StaticExecutionPlanningContract {
607 primary_key_names: schema_info.primary_key_names().to_vec(),
608 projection_spec,
609 execution_preparation_predicate,
610 execution_preparation_compiled_predicate,
611 residual_filter_contract,
612 scalar_projection_plan,
613 grouped_aggregate_execution_specs,
614 grouped_distinct_execution_strategy,
615 projection_direct_slots,
616 projection_data_row_direct_slots,
617 projection_referenced_slots,
618 projected_slot_mask,
619 projection_is_model_identity,
620 resolved_order,
621 order_referenced_slots,
622 slot_map,
623 index_compile_targets,
624 })
625}
626
627#[cfg(test)]
628fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
629 model.primary_key_names()
630}
631
632fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
633 schema_info
634 .primary_key_names()
635 .iter()
636 .map(String::as_str)
637 .collect()
638}
639
640fn compile_effective_runtime_filter_program(
644 schema_info: &SchemaInfo,
645 residual_filter_expr: Option<&Expr>,
646 residual_filter_predicate: Option<&Predicate>,
647) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
648 if let Some(predicate) = residual_filter_predicate {
653 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
654 PredicateProgram::compile_with_schema_info(schema_info, predicate),
655 )));
656 }
657
658 if let Some(filter_expr) = residual_filter_expr {
659 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
660 .ok_or_else(InternalError::query_invalid_logical_plan)?;
661
662 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
663 CompiledExpr::compile(&compiled),
664 )));
665 }
666
667 Ok(None)
668}
669
670fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
674 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
675
676 match plan.access.selected_index_contract() {
677 Some(index) => {
678 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
679 }
680 None => Some(query_predicate.clone()),
681 }
682}
683
684fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
688 let filtered_residual = derive_execution_preparation_predicate(plan);
689 let filtered_residual = filtered_residual.as_ref()?;
690
691 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
692}
693
694fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
698 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
699 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
700 return None;
701 }
702
703 Some(filter_expr.clone())
704}
705
706fn derive_residual_filter_expr_for_model(
710 model: &EntityModel,
711 plan: &AccessPlannedQuery,
712) -> Option<Expr> {
713 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
714 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
715 return None;
716 }
717
718 Some(filter_expr.clone())
719}
720
721fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
725 match (
726 plan.scalar_plan().filter_expr.as_ref(),
727 plan.scalar_plan().predicate.as_ref(),
728 ) {
729 (None, None) => false,
730 (Some(_), None) => true,
731 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
732 }
733}
734
735fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
738 plan.scalar_plan().predicate.is_some()
739 && derive_residual_filter_predicate(plan).is_none()
740 && derive_residual_filter_expr(plan).is_none()
741}
742
743const fn derive_semantic_filter_fully_satisfied_by_access_contract(
747 plan: &AccessPlannedQuery,
748) -> bool {
749 plan.scalar_plan().filter_expr.is_some()
750 && plan.scalar_plan().predicate.is_some()
751 && plan.scalar_plan().predicate_covers_filter_expr
752}
753
754const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
758 _model: &EntityModel,
759 plan: &AccessPlannedQuery,
760) -> bool {
761 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
762}
763
764fn compile_optional_predicate(
767 schema_info: &SchemaInfo,
768 predicate: Option<&Predicate>,
769) -> Option<PredicateProgram> {
770 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
771}
772
773fn resolve_grouped_static_planning_semantics(
777 schema_info: &SchemaInfo,
778 plan: &AccessPlannedQuery,
779 projection_spec: &ProjectionSpec,
780) -> Result<
781 (
782 Option<Vec<GroupedAggregateExecutionSpec>>,
783 Option<GroupedDistinctExecutionStrategy>,
784 ),
785 InternalError,
786> {
787 let Some(grouped) = plan.grouped_plan() else {
788 return Ok((None, None));
789 };
790
791 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
792 projection_spec,
793 grouped.group.group_fields.as_slice(),
794 grouped.group.aggregates.as_slice(),
795 )?;
796 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
797
798 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
799 schema_info,
800 aggregate_specs.as_slice(),
801 )?);
802 let grouped_distinct_execution_strategy = Some(
803 resolved_grouped_distinct_execution_strategy_with_schema_info(
804 schema_info,
805 grouped.group.group_fields.as_slice(),
806 grouped.group.aggregates.as_slice(),
807 grouped.having_expr.as_ref(),
808 )?,
809 );
810
811 Ok((
812 grouped_aggregate_execution_specs,
813 grouped_distinct_execution_strategy,
814 ))
815}
816
817fn extend_grouped_having_aggregate_specs(
818 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
819 grouped: &GroupPlan,
820) -> Result<(), InternalError> {
821 if let Some(having_expr) = grouped.having_expr.as_ref() {
822 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
823 }
824
825 Ok(())
826}
827
828fn collect_grouped_having_expr_aggregate_specs(
829 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
830 expr: &Expr,
831) -> Result<(), InternalError> {
832 if !expr.contains_aggregate() {
833 return Ok(());
834 }
835
836 expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
837 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
838
839 if aggregate_specs
840 .iter()
841 .all(|current| current != &aggregate_spec)
842 {
843 aggregate_specs.push(aggregate_spec);
844 }
845
846 Ok(())
847 })
848}
849
850fn projected_slot_mask_for_spec(
851 model: &EntityModel,
852 direct_projection_slots: Option<&[usize]>,
853) -> Vec<bool> {
854 let schema_slot_len = direct_projection_slots
855 .and_then(|slots| slots.iter().copied().max())
856 .map_or(0, |slot| slot.saturating_add(1));
857 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
858
859 let Some(direct_projection_slots) = direct_projection_slots else {
860 return projected_slots;
861 };
862
863 for slot in direct_projection_slots.iter().copied() {
864 if let Some(projected) = projected_slots.get_mut(slot) {
865 *projected = true;
866 }
867 }
868
869 projected_slots
870}
871
872fn resolved_order_for_plan(
873 schema_info: &SchemaInfo,
874 plan: &AccessPlannedQuery,
875) -> Result<Option<ResolvedOrder>, InternalError> {
876 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
877 return Ok(None);
878 }
879
880 let Some(order) = plan.scalar_plan().order.as_ref() else {
881 return Ok(None);
882 };
883
884 let mut fields = Vec::with_capacity(order.fields.len());
885 for term in &order.fields {
886 fields.push(ResolvedOrderField::new(
887 resolved_order_value_source_for_term(schema_info, term)?,
888 term.direction(),
889 ));
890 }
891
892 Ok(Some(ResolvedOrder::new(fields)))
893}
894
895fn resolved_order_value_source_for_term(
896 schema_info: &SchemaInfo,
897 term: &crate::db::query::plan::OrderTerm,
898) -> Result<ResolvedOrderValueSource, InternalError> {
899 if term.direct_field().is_none() {
900 let rendered = term.rendered_label();
901 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
902 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
903 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
904
905 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
906 &compiled,
907 )));
908 }
909
910 let field = term.direct_field().expect("query semantics invariant");
911 let slot = resolve_required_schema_slot(
912 schema_info,
913 field,
914 InternalError::query_invalid_logical_plan,
915 )?;
916
917 Ok(ResolvedOrderValueSource::direct_field(slot))
918}
919
920fn validate_resolved_order_expr_fields(
921 schema_info: &SchemaInfo,
922 expr: &Expr,
923 rendered: &str,
924) -> Result<(), InternalError> {
925 expr.try_for_each_tree_expr(&mut |node| match node {
926 Expr::Field(field_id) => resolve_required_schema_slot(
927 schema_info,
928 field_id.as_str(),
929 InternalError::query_invalid_logical_plan,
930 )
931 .map(|_| ()),
932 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
933 #[cfg(test)]
934 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
935 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
936 _ => Ok(()),
937 })
938}
939
940fn resolve_required_schema_slot<F>(
944 schema_info: &SchemaInfo,
945 field: &str,
946 invalid_plan_error: F,
947) -> Result<usize, InternalError>
948where
949 F: FnOnce() -> InternalError,
950{
951 schema_info
952 .field_slot_index(field)
953 .ok_or_else(invalid_plan_error)
954}
955
956fn order_expression_scalar_seam_error(_rendered: &str) -> InternalError {
959 InternalError::query_invalid_logical_plan()
960}
961
962fn order_referenced_slots_for_resolved_order(
967 resolved_order: Option<&ResolvedOrder>,
968) -> Option<Vec<usize>> {
969 Some(resolved_order?.referenced_slots())
970}
971
972fn slot_map_for_schema_plan(
973 schema_info: &SchemaInfo,
974 plan: &AccessPlannedQuery,
975) -> Option<Vec<usize>> {
976 let executable = plan.access.executable_contract();
977
978 resolved_index_slots_for_access_path(schema_info, &executable)
979}
980
981fn resolved_index_slots_for_access_path(
982 schema_info: &SchemaInfo,
983 access: &ExecutableAccessPlan<'_, crate::value::Value>,
984) -> Option<Vec<usize>> {
985 let path = access.as_path()?;
986 let path_facts = path.shape_facts();
987 let key_items = path_facts.index_key_items_for_slot_map()?;
988 let mut slots = Vec::new();
989
990 match key_items.key_items() {
991 SemanticIndexKeyItemsRef::Fields(fields) => {
992 slots.reserve(fields.len());
993 for field_name in fields {
994 let slot = schema_info.field_slot_index(field_name)?;
995 slots.push(slot);
996 }
997 }
998 SemanticIndexKeyItemsRef::Accepted(items) => {
999 slots.reserve(items.len());
1000 for key_item in items {
1001 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
1002 slots.push(slot);
1003 }
1004 }
1005 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1006 slots.reserve(fields.len());
1007 for &field_name in fields {
1008 let slot = schema_info.field_slot_index(field_name)?;
1009 slots.push(slot);
1010 }
1011 }
1012 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1013 slots.reserve(items.len());
1014 for key_item in items {
1015 let slot = schema_info.field_slot_index(key_item.field())?;
1016 slots.push(slot);
1017 }
1018 }
1019 }
1020
1021 Some(slots)
1022}
1023
1024fn index_compile_targets_for_schema_plan(
1025 schema_info: &SchemaInfo,
1026 plan: &AccessPlannedQuery,
1027) -> Option<Vec<IndexCompileTarget>> {
1028 let executable = plan.access.executable_contract();
1029 let path = executable.as_path()?;
1030 let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1031 let mut targets = Vec::new();
1032
1033 match key_items.key_items() {
1034 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1035 return None;
1036 }
1037 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1038 for (component_index, &field_name) in fields.iter().enumerate() {
1039 let field_slot = schema_info.field_slot_index(field_name)?;
1040 targets.push(IndexCompileTarget {
1041 component_index,
1042 field_slot,
1043 key_item: IndexKeyItem::Field(field_name),
1044 });
1045 }
1046 }
1047 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1048 for (component_index, &key_item) in items.iter().enumerate() {
1049 let field_slot = schema_info.field_slot_index(key_item.field())?;
1050 targets.push(IndexCompileTarget {
1051 component_index,
1052 field_slot,
1053 key_item,
1054 });
1055 }
1056 }
1057 }
1058
1059 Some(targets)
1060}