1#[cfg(any(test, feature = "sql"))]
7use crate::db::predicate::MissingRowPolicy;
8use crate::{
9 db::{
10 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
11 predicate::{IndexCompileTarget, Predicate, PredicateProgram},
12 query::plan::{
13 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
14 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
15 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
16 LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
17 ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
18 derive_logical_pushdown_eligibility,
19 expr::{
20 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
21 compile_scalar_projection_plan_with_schema,
22 },
23 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24 grouped_cursor_policy_violation, grouped_plan_strategy,
25 lower_data_row_direct_projection_slots_with_schema,
26 lower_direct_projection_slots_with_schema, lower_projection_identity,
27 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
28 residual_query_predicate_after_filtered_access_contract,
29 resolved_grouped_distinct_execution_strategy_with_schema_info,
30 },
31 schema::SchemaInfo,
32 },
33 error::InternalError,
34 model::{
35 entity::EntityModel,
36 index::{IndexKeyItem, IndexKeyItemsRef},
37 },
38};
39
40impl QueryMode {
41 #[must_use]
43 pub const fn is_load(&self) -> bool {
44 match self {
45 Self::Load(_) => true,
46 Self::Delete(_) => false,
47 }
48 }
49
50 #[must_use]
52 pub const fn is_delete(&self) -> bool {
53 match self {
54 Self::Delete(_) => true,
55 Self::Load(_) => false,
56 }
57 }
58}
59
60impl LogicalPlan {
61 #[must_use]
63 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
64 match self {
65 Self::Scalar(plan) => plan,
66 Self::Grouped(plan) => &plan.scalar,
67 }
68 }
69
70 #[must_use]
72 #[cfg(test)]
73 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
74 match self {
75 Self::Scalar(plan) => plan,
76 Self::Grouped(plan) => &mut plan.scalar,
77 }
78 }
79
80 #[must_use]
82 #[cfg(test)]
83 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
84 self.scalar_semantics()
85 }
86
87 #[must_use]
89 #[cfg(test)]
90 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
91 self.scalar_semantics_mut()
92 }
93}
94
95impl AccessPlannedQuery {
96 #[must_use]
98 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
99 self.logical.scalar_semantics()
100 }
101
102 #[must_use]
105 #[cfg(any(test, feature = "sql"))]
106 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
107 self.scalar_plan().consistency
108 }
109
110 #[must_use]
112 #[cfg(test)]
113 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
114 self.logical.scalar_semantics_mut()
115 }
116
117 #[must_use]
119 #[cfg(test)]
120 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
121 self.scalar_plan()
122 }
123
124 #[must_use]
126 #[cfg(test)]
127 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
128 self.scalar_plan_mut()
129 }
130
131 #[must_use]
133 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
134 match &self.logical {
135 LogicalPlan::Scalar(_) => None,
136 LogicalPlan::Grouped(plan) => Some(plan),
137 }
138 }
139
140 #[must_use]
142 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
143 if let Some(static_contract) = &self.static_execution_planning_contract {
144 return static_contract.projection_spec.clone();
145 }
146
147 lower_projection_intent(model, &self.logical, &self.projection_selection)
148 }
149
150 #[must_use]
152 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
153 lower_projection_identity(&self.logical, &self.projection_selection)
154 }
155
156 #[must_use]
162 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
163 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
164 return static_contract.execution_preparation_predicate.clone();
165 }
166
167 derive_execution_preparation_predicate(self)
168 }
169
170 #[must_use]
174 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
175 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
176 return static_contract.residual_filter_predicate.clone();
177 }
178
179 derive_residual_filter_predicate(self)
180 }
181
182 #[must_use]
185 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
186 self.effective_execution_predicate().is_some()
187 }
188
189 #[must_use]
192 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
193 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
194 return static_contract.residual_filter_expr.as_ref();
195 }
196
197 if !derive_has_residual_filter(self) {
198 return None;
199 }
200
201 self.scalar_plan().filter_expr.as_ref()
202 }
203
204 #[must_use]
207 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
208 self.residual_filter_expr().is_some()
209 }
210
211 #[must_use]
213 pub(in crate::db) const fn execution_preparation_compiled_predicate(
214 &self,
215 ) -> Option<&PredicateProgram> {
216 self.static_execution_planning_contract()
217 .execution_preparation_compiled_predicate
218 .as_ref()
219 }
220
221 #[must_use]
223 pub(in crate::db) const fn effective_runtime_compiled_predicate(
224 &self,
225 ) -> Option<&PredicateProgram> {
226 match self
227 .static_execution_planning_contract()
228 .effective_runtime_filter_program
229 .as_ref()
230 {
231 Some(program) => program.predicate_program(),
232 None => None,
233 }
234 }
235
236 #[cfg(test)]
238 #[must_use]
239 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
240 &self,
241 ) -> Option<&CompiledExpr> {
242 match self
243 .static_execution_planning_contract()
244 .effective_runtime_filter_program
245 .as_ref()
246 {
247 Some(program) => program.expression_filter(),
248 None => None,
249 }
250 }
251
252 #[must_use]
254 pub(in crate::db) const fn effective_runtime_filter_program(
255 &self,
256 ) -> Option<&EffectiveRuntimeFilterProgram> {
257 self.static_execution_planning_contract()
258 .effective_runtime_filter_program
259 .as_ref()
260 }
261
262 #[must_use]
264 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
265 if !self.scalar_plan().distinct {
266 return DistinctExecutionStrategy::None;
267 }
268
269 match distinct_runtime_dedup_strategy(&self.access) {
273 Some(strategy) => strategy,
274 None => DistinctExecutionStrategy::None,
275 }
276 }
277
278 #[cfg(test)]
280 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
281 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
282 }
283
284 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
286 &mut self,
287 schema_info: &SchemaInfo,
288 ) {
289 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
290 }
291
292 #[cfg(test)]
294 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
295 &mut self,
296 model: &EntityModel,
297 ) -> Result<(), InternalError> {
298 self.finalize_static_execution_planning_contract_for_model_with_schema(
299 model,
300 SchemaInfo::cached_for_generated_entity_model(model),
301 )
302 }
303
304 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
306 &mut self,
307 model: &EntityModel,
308 schema_info: &SchemaInfo,
309 ) -> Result<(), InternalError> {
310 self.static_execution_planning_contract = Some(
311 project_static_execution_planning_contract_for_model(model, schema_info, self)?,
312 );
313
314 Ok(())
315 }
316
317 #[must_use]
319 pub(in crate::db) fn execution_shape_signature(
320 &self,
321 entity_path: &'static str,
322 ) -> ExecutionShapeSignature {
323 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
324 }
325
326 #[must_use]
329 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
330 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
331 return self.scalar_plan().predicate.is_some()
332 && static_contract.residual_filter_predicate.is_none()
333 && static_contract.residual_filter_expr.is_none();
334 }
335
336 derive_predicate_fully_satisfied_by_access_contract(self)
337 }
338
339 #[must_use]
341 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
342 self.static_execution_planning_contract()
343 .scalar_projection_plan
344 .as_deref()
345 }
346
347 #[must_use]
349 pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
350 self.static_execution_planning_contract.is_some()
351 }
352
353 #[must_use]
355 pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
356 self.static_execution_planning_contract()
357 .primary_key_names
358 .iter()
359 .map(String::as_str)
360 .collect()
361 }
362
363 #[must_use]
365 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
366 self.static_execution_planning_contract()
367 .projection_referenced_slots
368 .as_slice()
369 }
370
371 #[must_use]
373 #[cfg(any(test, feature = "diagnostics"))]
374 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
375 self.static_execution_planning_contract()
376 .projected_slot_mask
377 .as_slice()
378 }
379
380 #[must_use]
382 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
383 self.static_execution_planning_contract()
384 .projection_is_model_identity
385 }
386
387 #[must_use]
389 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
390 self.static_execution_planning_contract()
391 .order_referenced_slots
392 .as_deref()
393 }
394
395 #[must_use]
397 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
398 self.static_execution_planning_contract()
399 .resolved_order
400 .as_ref()
401 }
402
403 #[must_use]
405 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
406 self.static_execution_planning_contract()
407 .slot_map
408 .as_deref()
409 }
410
411 #[must_use]
413 pub(in crate::db) fn grouped_aggregate_execution_specs(
414 &self,
415 ) -> Option<&[GroupedAggregateExecutionSpec]> {
416 self.static_execution_planning_contract()
417 .grouped_aggregate_execution_specs
418 .as_deref()
419 }
420
421 #[must_use]
423 pub(in crate::db) const fn grouped_distinct_execution_strategy(
424 &self,
425 ) -> Option<&GroupedDistinctExecutionStrategy> {
426 self.static_execution_planning_contract()
427 .grouped_distinct_execution_strategy
428 .as_ref()
429 }
430
431 #[must_use]
433 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
434 &self.static_execution_planning_contract().projection_spec
435 }
436
437 #[must_use]
439 #[cfg(any(test, feature = "sql"))]
440 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
441 self.static_execution_planning_contract()
442 .projection_direct_slots
443 .as_deref()
444 }
445
446 #[must_use]
448 #[cfg(any(test, feature = "sql"))]
449 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
450 self.static_execution_planning_contract()
451 .projection_data_row_direct_slots
452 .as_deref()
453 }
454
455 #[must_use]
457 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
458 self.static_execution_planning_contract()
459 .index_compile_targets
460 .as_deref()
461 }
462
463 const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
464 self.static_execution_planning_contract
465 .as_ref()
466 .expect("access-planned queries must freeze static execution planning contract before execution")
467 }
468}
469
470fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
471 match access {
472 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
473 Some(DistinctExecutionStrategy::PreOrdered)
474 }
475 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
476 Some(DistinctExecutionStrategy::HashMaterialize)
477 }
478 AccessPlan::Path(_) => None,
479 }
480}
481
482fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
483 let is_grouped_safe = plan
484 .grouped_plan()
485 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
486
487 ContinuationPolicy::new(
488 true, true, is_grouped_safe,
491 )
492}
493
494#[must_use]
496#[cfg(test)]
497pub(in crate::db) fn project_planner_route_profile_for_model(
498 model: &EntityModel,
499 plan: &AccessPlannedQuery,
500) -> PlannerRouteProfile {
501 let primary_key_names = ordered_primary_key_names(model);
502 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
503 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
504 });
505
506 PlannerRouteProfile::new(
507 derive_continuation_policy_validated(plan),
508 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
509 secondary_order_contract,
510 )
511}
512
513#[must_use]
515pub(in crate::db) fn project_planner_route_profile_for_schema(
516 schema_info: &SchemaInfo,
517 plan: &AccessPlannedQuery,
518) -> PlannerRouteProfile {
519 let primary_key_names = primary_key_names_from_schema(schema_info);
520 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
521 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
522 });
523
524 PlannerRouteProfile::new(
525 derive_continuation_policy_validated(plan),
526 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
527 secondary_order_contract,
528 )
529}
530
531fn project_static_execution_planning_contract_for_model(
532 model: &EntityModel,
533 schema_info: &SchemaInfo,
534 plan: &AccessPlannedQuery,
535) -> Result<StaticExecutionPlanningContract, InternalError> {
536 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
537 let execution_preparation_predicate = plan.execution_preparation_predicate();
538 let residual_filter_predicate = derive_residual_filter_predicate(plan);
539 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
540 let execution_preparation_compiled_predicate =
541 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
542 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
543 schema_info,
544 residual_filter_expr.as_ref(),
545 residual_filter_predicate.as_ref(),
546 )?;
547 let scalar_projection_plan = if plan.grouped_plan().is_none() {
548 Some(
549 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
550 .ok_or_else(|| {
551 InternalError::query_executor_invariant(
552 "scalar projection program must compile during static planning finalization",
553 )
554 })?
555 .iter()
556 .map(CompiledExpr::compile)
557 .collect(),
558 )
559 } else {
560 None
561 };
562 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
563 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
564 let projection_direct_slots = lower_direct_projection_slots_with_schema(
565 model,
566 schema_info,
567 &plan.logical,
568 &plan.projection_selection,
569 );
570 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
571 model,
572 schema_info,
573 &plan.logical,
574 &plan.projection_selection,
575 );
576 let projection_referenced_slots =
577 projection_spec.referenced_slots_for_schema(model, schema_info)?;
578 let projected_slot_mask =
579 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
580 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
581 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
582 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
583 let slot_map = slot_map_for_schema_plan(schema_info, plan);
584 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
585
586 Ok(StaticExecutionPlanningContract {
587 primary_key_names: schema_info.primary_key_names().to_vec(),
588 projection_spec,
589 execution_preparation_predicate,
590 residual_filter_expr,
591 residual_filter_predicate,
592 execution_preparation_compiled_predicate,
593 effective_runtime_filter_program,
594 scalar_projection_plan,
595 grouped_aggregate_execution_specs,
596 grouped_distinct_execution_strategy,
597 projection_direct_slots,
598 projection_data_row_direct_slots,
599 projection_referenced_slots,
600 projected_slot_mask,
601 projection_is_model_identity,
602 resolved_order,
603 order_referenced_slots,
604 slot_map,
605 index_compile_targets,
606 })
607}
608
609#[cfg(test)]
610fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
611 model.primary_key_names()
612}
613
614fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
615 schema_info
616 .primary_key_names()
617 .iter()
618 .map(String::as_str)
619 .collect()
620}
621
622fn compile_effective_runtime_filter_program(
626 schema_info: &SchemaInfo,
627 residual_filter_expr: Option<&Expr>,
628 residual_filter_predicate: Option<&Predicate>,
629) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
630 if let Some(predicate) = residual_filter_predicate {
635 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
636 PredicateProgram::compile_with_schema_info(schema_info, predicate),
637 )));
638 }
639
640 if let Some(filter_expr) = residual_filter_expr {
641 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
642 .ok_or_else(|| {
643 InternalError::query_invalid_logical_plan(
644 "effective runtime scalar filter expression must compile during static planning finalization",
645 )
646 })?;
647
648 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
649 CompiledExpr::compile(&compiled),
650 )));
651 }
652
653 Ok(None)
654}
655
656fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
660 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
661
662 match plan.access.selected_index_contract() {
663 Some(index) => {
664 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
665 }
666 None => Some(query_predicate.clone()),
667 }
668}
669
670fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
674 let filtered_residual = derive_execution_preparation_predicate(plan);
675 let filtered_residual = filtered_residual.as_ref()?;
676
677 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
678}
679
680fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
684 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
685 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
686 return None;
687 }
688
689 Some(filter_expr.clone())
690}
691
692fn derive_residual_filter_expr_for_model(
696 model: &EntityModel,
697 plan: &AccessPlannedQuery,
698) -> Option<Expr> {
699 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
700 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
701 return None;
702 }
703
704 Some(filter_expr.clone())
705}
706
707fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
711 match (
712 plan.scalar_plan().filter_expr.as_ref(),
713 plan.scalar_plan().predicate.as_ref(),
714 ) {
715 (None, None) => false,
716 (Some(_), None) => true,
717 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
718 }
719}
720
721fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
724 plan.scalar_plan().predicate.is_some()
725 && derive_residual_filter_predicate(plan).is_none()
726 && derive_residual_filter_expr(plan).is_none()
727}
728
729const fn derive_semantic_filter_fully_satisfied_by_access_contract(
733 plan: &AccessPlannedQuery,
734) -> bool {
735 plan.scalar_plan().filter_expr.is_some()
736 && plan.scalar_plan().predicate.is_some()
737 && plan.scalar_plan().predicate_covers_filter_expr
738}
739
740const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
744 _model: &EntityModel,
745 plan: &AccessPlannedQuery,
746) -> bool {
747 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
748}
749
750fn compile_optional_predicate(
753 schema_info: &SchemaInfo,
754 predicate: Option<&Predicate>,
755) -> Option<PredicateProgram> {
756 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
757}
758
759fn resolve_grouped_static_planning_semantics(
763 schema_info: &SchemaInfo,
764 plan: &AccessPlannedQuery,
765 projection_spec: &ProjectionSpec,
766) -> Result<
767 (
768 Option<Vec<GroupedAggregateExecutionSpec>>,
769 Option<GroupedDistinctExecutionStrategy>,
770 ),
771 InternalError,
772> {
773 let Some(grouped) = plan.grouped_plan() else {
774 return Ok((None, None));
775 };
776
777 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
778 projection_spec,
779 grouped.group.group_fields.as_slice(),
780 grouped.group.aggregates.as_slice(),
781 )?;
782 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
783
784 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
785 schema_info,
786 aggregate_specs.as_slice(),
787 )?);
788 let grouped_distinct_execution_strategy = Some(
789 resolved_grouped_distinct_execution_strategy_with_schema_info(
790 schema_info,
791 grouped.group.group_fields.as_slice(),
792 grouped.group.aggregates.as_slice(),
793 grouped.having_expr.as_ref(),
794 )?,
795 );
796
797 Ok((
798 grouped_aggregate_execution_specs,
799 grouped_distinct_execution_strategy,
800 ))
801}
802
803fn extend_grouped_having_aggregate_specs(
804 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
805 grouped: &GroupPlan,
806) -> Result<(), InternalError> {
807 if let Some(having_expr) = grouped.having_expr.as_ref() {
808 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
809 }
810
811 Ok(())
812}
813
814fn collect_grouped_having_expr_aggregate_specs(
815 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
816 expr: &Expr,
817) -> Result<(), InternalError> {
818 if !expr.contains_aggregate() {
819 return Ok(());
820 }
821
822 expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
823 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
824
825 if aggregate_specs
826 .iter()
827 .all(|current| current != &aggregate_spec)
828 {
829 aggregate_specs.push(aggregate_spec);
830 }
831
832 Ok(())
833 })
834}
835
836fn projected_slot_mask_for_spec(
837 model: &EntityModel,
838 direct_projection_slots: Option<&[usize]>,
839) -> Vec<bool> {
840 let schema_slot_len = direct_projection_slots
841 .and_then(|slots| slots.iter().copied().max())
842 .map_or(0, |slot| slot.saturating_add(1));
843 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
844
845 let Some(direct_projection_slots) = direct_projection_slots else {
846 return projected_slots;
847 };
848
849 for slot in direct_projection_slots.iter().copied() {
850 if let Some(projected) = projected_slots.get_mut(slot) {
851 *projected = true;
852 }
853 }
854
855 projected_slots
856}
857
858fn resolved_order_for_plan(
859 schema_info: &SchemaInfo,
860 plan: &AccessPlannedQuery,
861) -> Result<Option<ResolvedOrder>, InternalError> {
862 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
863 return Ok(None);
864 }
865
866 let Some(order) = plan.scalar_plan().order.as_ref() else {
867 return Ok(None);
868 };
869
870 let mut fields = Vec::with_capacity(order.fields.len());
871 for term in &order.fields {
872 fields.push(ResolvedOrderField::new(
873 resolved_order_value_source_for_term(schema_info, term)?,
874 term.direction(),
875 ));
876 }
877
878 Ok(Some(ResolvedOrder::new(fields)))
879}
880
881fn resolved_order_value_source_for_term(
882 schema_info: &SchemaInfo,
883 term: &crate::db::query::plan::OrderTerm,
884) -> Result<ResolvedOrderValueSource, InternalError> {
885 if term.direct_field().is_none() {
886 let rendered = term.rendered_label();
887 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
888 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
889 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
890
891 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
892 &compiled,
893 )));
894 }
895
896 let field = term
897 .direct_field()
898 .expect("direct-field order branch should only execute for field-backed terms");
899 let slot = resolve_required_schema_slot(schema_info, field, || {
900 InternalError::query_invalid_logical_plan(format!(
901 "order expression references unknown field '{field}'",
902 ))
903 })?;
904
905 Ok(ResolvedOrderValueSource::direct_field(slot))
906}
907
908fn validate_resolved_order_expr_fields(
909 schema_info: &SchemaInfo,
910 expr: &Expr,
911 rendered: &str,
912) -> Result<(), InternalError> {
913 expr.try_for_each_tree_expr(&mut |node| match node {
914 Expr::Field(field_id) => {
915 resolve_required_schema_slot(schema_info, field_id.as_str(), || {
916 InternalError::query_invalid_logical_plan(format!(
917 "order expression references unknown field '{rendered}'",
918 ))
919 })
920 .map(|_| ())
921 }
922 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
923 #[cfg(test)]
924 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
925 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
926 _ => Ok(()),
927 })
928}
929
930fn resolve_required_schema_slot<F>(
934 schema_info: &SchemaInfo,
935 field: &str,
936 invalid_plan_error: F,
937) -> Result<usize, InternalError>
938where
939 F: FnOnce() -> InternalError,
940{
941 schema_info
942 .field_slot_index(field)
943 .ok_or_else(invalid_plan_error)
944}
945
946fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
949 InternalError::query_invalid_logical_plan(format!(
950 "order expression '{rendered}' did not stay on the scalar expression seam",
951 ))
952}
953
954fn order_referenced_slots_for_resolved_order(
959 resolved_order: Option<&ResolvedOrder>,
960) -> Option<Vec<usize>> {
961 Some(resolved_order?.referenced_slots())
962}
963
964fn slot_map_for_schema_plan(
965 schema_info: &SchemaInfo,
966 plan: &AccessPlannedQuery,
967) -> Option<Vec<usize>> {
968 let executable = plan.access.executable_contract();
969
970 resolved_index_slots_for_access_path(schema_info, &executable)
971}
972
973fn resolved_index_slots_for_access_path(
974 schema_info: &SchemaInfo,
975 access: &ExecutableAccessPlan<'_, crate::value::Value>,
976) -> Option<Vec<usize>> {
977 let path = access.as_path()?;
978 let path_facts = path.shape_facts();
979 let key_items = path_facts.index_key_items_for_slot_map()?;
980 let mut slots = Vec::new();
981
982 match key_items.key_items() {
983 SemanticIndexKeyItemsRef::Fields(fields) => {
984 slots.reserve(fields.len());
985 for field_name in fields {
986 let slot = schema_info.field_slot_index(field_name)?;
987 slots.push(slot);
988 }
989 }
990 SemanticIndexKeyItemsRef::Accepted(items) => {
991 slots.reserve(items.len());
992 for key_item in items {
993 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
994 slots.push(slot);
995 }
996 }
997 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
998 slots.reserve(fields.len());
999 for &field_name in fields {
1000 let slot = schema_info.field_slot_index(field_name)?;
1001 slots.push(slot);
1002 }
1003 }
1004 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1005 slots.reserve(items.len());
1006 for key_item in items {
1007 let slot = schema_info.field_slot_index(key_item.field())?;
1008 slots.push(slot);
1009 }
1010 }
1011 }
1012
1013 Some(slots)
1014}
1015
1016fn index_compile_targets_for_schema_plan(
1017 schema_info: &SchemaInfo,
1018 plan: &AccessPlannedQuery,
1019) -> Option<Vec<IndexCompileTarget>> {
1020 let executable = plan.access.executable_contract();
1021 let path = executable.as_path()?;
1022 let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1023 let mut targets = Vec::new();
1024
1025 match key_items.key_items() {
1026 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1027 return None;
1028 }
1029 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1030 for (component_index, &field_name) in fields.iter().enumerate() {
1031 let field_slot = schema_info.field_slot_index(field_name)?;
1032 targets.push(IndexCompileTarget {
1033 component_index,
1034 field_slot,
1035 key_item: IndexKeyItem::Field(field_name),
1036 });
1037 }
1038 }
1039 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1040 for (component_index, &key_item) in items.iter().enumerate() {
1041 let field_slot = schema_info.field_slot_index(key_item.field())?;
1042 targets.push(IndexCompileTarget {
1043 component_index,
1044 field_slot,
1045 key_item,
1046 });
1047 }
1048 }
1049 }
1050
1051 Some(targets)
1052}