1use crate::{
7 db::{
8 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
9 predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10 query::plan::{
11 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14 LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15 ResolvedOrderValueSource, ScalarPlan, StaticExecutionPlanningContract,
16 derive_logical_pushdown_eligibility,
17 expr::{
18 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
19 compile_scalar_projection_plan_with_schema,
20 },
21 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22 grouped_cursor_policy_violation, grouped_plan_strategy,
23 lower_data_row_direct_projection_slots_with_schema,
24 lower_direct_projection_slots_with_schema, lower_projection_identity,
25 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
26 residual_query_predicate_after_filtered_access_contract,
27 resolved_grouped_distinct_execution_strategy_with_schema_info,
28 },
29 schema::SchemaInfo,
30 },
31 error::InternalError,
32 model::{
33 entity::EntityModel,
34 index::{IndexKeyItem, IndexKeyItemsRef},
35 },
36};
37
38impl QueryMode {
39 #[must_use]
41 pub const fn is_load(&self) -> bool {
42 match self {
43 Self::Load(_) => true,
44 Self::Delete(_) => false,
45 }
46 }
47
48 #[must_use]
50 pub const fn is_delete(&self) -> bool {
51 match self {
52 Self::Delete(_) => true,
53 Self::Load(_) => false,
54 }
55 }
56}
57
58impl LogicalPlan {
59 #[must_use]
61 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
62 match self {
63 Self::Scalar(plan) => plan,
64 Self::Grouped(plan) => &plan.scalar,
65 }
66 }
67
68 #[must_use]
70 #[cfg(test)]
71 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
72 match self {
73 Self::Scalar(plan) => plan,
74 Self::Grouped(plan) => &mut plan.scalar,
75 }
76 }
77
78 #[must_use]
80 #[cfg(test)]
81 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
82 self.scalar_semantics()
83 }
84
85 #[must_use]
87 #[cfg(test)]
88 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
89 self.scalar_semantics_mut()
90 }
91}
92
93impl AccessPlannedQuery {
94 #[must_use]
96 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
97 self.logical.scalar_semantics()
98 }
99
100 #[must_use]
103 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
104 self.scalar_plan().consistency
105 }
106
107 #[must_use]
109 #[cfg(test)]
110 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
111 self.logical.scalar_semantics_mut()
112 }
113
114 #[must_use]
116 #[cfg(test)]
117 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
118 self.scalar_plan()
119 }
120
121 #[must_use]
123 #[cfg(test)]
124 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
125 self.scalar_plan_mut()
126 }
127
128 #[must_use]
130 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
131 match &self.logical {
132 LogicalPlan::Scalar(_) => None,
133 LogicalPlan::Grouped(plan) => Some(plan),
134 }
135 }
136
137 #[must_use]
139 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
140 if let Some(static_contract) = &self.static_execution_planning_contract {
141 return static_contract.projection_spec.clone();
142 }
143
144 lower_projection_intent(model, &self.logical, &self.projection_selection)
145 }
146
147 #[must_use]
149 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
150 lower_projection_identity(&self.logical, &self.projection_selection)
151 }
152
153 #[must_use]
159 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
160 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
161 return static_contract.execution_preparation_predicate.clone();
162 }
163
164 derive_execution_preparation_predicate(self)
165 }
166
167 #[must_use]
171 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
172 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
173 return static_contract.residual_filter_predicate.clone();
174 }
175
176 derive_residual_filter_predicate(self)
177 }
178
179 #[must_use]
182 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
183 self.effective_execution_predicate().is_some()
184 }
185
186 #[must_use]
189 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
190 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
191 return static_contract.residual_filter_expr.as_ref();
192 }
193
194 if !derive_has_residual_filter(self) {
195 return None;
196 }
197
198 self.scalar_plan().filter_expr.as_ref()
199 }
200
201 #[must_use]
204 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
205 self.residual_filter_expr().is_some()
206 }
207
208 #[must_use]
210 pub(in crate::db) const fn execution_preparation_compiled_predicate(
211 &self,
212 ) -> Option<&PredicateProgram> {
213 self.static_execution_planning_contract()
214 .execution_preparation_compiled_predicate
215 .as_ref()
216 }
217
218 #[must_use]
220 pub(in crate::db) const fn effective_runtime_compiled_predicate(
221 &self,
222 ) -> Option<&PredicateProgram> {
223 match self
224 .static_execution_planning_contract()
225 .effective_runtime_filter_program
226 .as_ref()
227 {
228 Some(program) => program.predicate_program(),
229 None => None,
230 }
231 }
232
233 #[cfg(test)]
235 #[must_use]
236 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
237 &self,
238 ) -> Option<&CompiledExpr> {
239 match self
240 .static_execution_planning_contract()
241 .effective_runtime_filter_program
242 .as_ref()
243 {
244 Some(program) => program.expression_filter(),
245 None => None,
246 }
247 }
248
249 #[must_use]
251 pub(in crate::db) const fn effective_runtime_filter_program(
252 &self,
253 ) -> Option<&EffectiveRuntimeFilterProgram> {
254 self.static_execution_planning_contract()
255 .effective_runtime_filter_program
256 .as_ref()
257 }
258
259 #[must_use]
261 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
262 if !self.scalar_plan().distinct {
263 return DistinctExecutionStrategy::None;
264 }
265
266 match distinct_runtime_dedup_strategy(&self.access) {
270 Some(strategy) => strategy,
271 None => DistinctExecutionStrategy::None,
272 }
273 }
274
275 #[cfg(test)]
277 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
278 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
279 }
280
281 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
283 &mut self,
284 schema_info: &SchemaInfo,
285 ) {
286 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
287 }
288
289 #[cfg(test)]
291 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_only(
292 &mut self,
293 model: &EntityModel,
294 ) -> Result<(), InternalError> {
295 self.finalize_static_execution_planning_contract_for_model_with_schema(
296 model,
297 SchemaInfo::cached_for_generated_entity_model(model),
298 )
299 }
300
301 pub(in crate::db) fn finalize_static_execution_planning_contract_for_model_with_schema(
303 &mut self,
304 model: &EntityModel,
305 schema_info: &SchemaInfo,
306 ) -> Result<(), InternalError> {
307 self.static_execution_planning_contract = Some(
308 project_static_execution_planning_contract_for_model(model, schema_info, self)?,
309 );
310
311 Ok(())
312 }
313
314 #[must_use]
316 pub(in crate::db) fn execution_shape_signature(
317 &self,
318 entity_path: &'static str,
319 ) -> ExecutionShapeSignature {
320 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
321 }
322
323 #[must_use]
326 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
327 if let Some(static_contract) = self.static_execution_planning_contract.as_ref() {
328 return self.scalar_plan().predicate.is_some()
329 && static_contract.residual_filter_predicate.is_none()
330 && static_contract.residual_filter_expr.is_none();
331 }
332
333 derive_predicate_fully_satisfied_by_access_contract(self)
334 }
335
336 #[must_use]
338 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
339 self.static_execution_planning_contract()
340 .scalar_projection_plan
341 .as_deref()
342 }
343
344 #[must_use]
346 pub(in crate::db) const fn has_static_execution_planning_contract(&self) -> bool {
347 self.static_execution_planning_contract.is_some()
348 }
349
350 #[must_use]
352 pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
353 self.static_execution_planning_contract()
354 .primary_key_names
355 .iter()
356 .map(String::as_str)
357 .collect()
358 }
359
360 #[must_use]
362 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
363 self.static_execution_planning_contract()
364 .projection_referenced_slots
365 .as_slice()
366 }
367
368 #[must_use]
370 #[cfg(any(test, feature = "diagnostics"))]
371 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
372 self.static_execution_planning_contract()
373 .projected_slot_mask
374 .as_slice()
375 }
376
377 #[must_use]
379 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
380 self.static_execution_planning_contract()
381 .projection_is_model_identity
382 }
383
384 #[must_use]
386 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
387 self.static_execution_planning_contract()
388 .order_referenced_slots
389 .as_deref()
390 }
391
392 #[must_use]
394 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
395 self.static_execution_planning_contract()
396 .resolved_order
397 .as_ref()
398 }
399
400 #[must_use]
402 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
403 self.static_execution_planning_contract()
404 .slot_map
405 .as_deref()
406 }
407
408 #[must_use]
410 pub(in crate::db) fn grouped_aggregate_execution_specs(
411 &self,
412 ) -> Option<&[GroupedAggregateExecutionSpec]> {
413 self.static_execution_planning_contract()
414 .grouped_aggregate_execution_specs
415 .as_deref()
416 }
417
418 #[must_use]
420 pub(in crate::db) const fn grouped_distinct_execution_strategy(
421 &self,
422 ) -> Option<&GroupedDistinctExecutionStrategy> {
423 self.static_execution_planning_contract()
424 .grouped_distinct_execution_strategy
425 .as_ref()
426 }
427
428 #[must_use]
430 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
431 &self.static_execution_planning_contract().projection_spec
432 }
433
434 #[must_use]
436 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
437 self.static_execution_planning_contract()
438 .projection_direct_slots
439 .as_deref()
440 }
441
442 #[must_use]
444 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
445 self.static_execution_planning_contract()
446 .projection_data_row_direct_slots
447 .as_deref()
448 }
449
450 #[must_use]
452 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
453 self.static_execution_planning_contract()
454 .index_compile_targets
455 .as_deref()
456 }
457
458 const fn static_execution_planning_contract(&self) -> &StaticExecutionPlanningContract {
459 self.static_execution_planning_contract
460 .as_ref()
461 .expect("access-planned queries must freeze static execution planning contract before execution")
462 }
463}
464
465fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
466 match access {
467 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
468 Some(DistinctExecutionStrategy::PreOrdered)
469 }
470 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
471 Some(DistinctExecutionStrategy::HashMaterialize)
472 }
473 AccessPlan::Path(_) => None,
474 }
475}
476
477fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
478 let is_grouped_safe = plan
479 .grouped_plan()
480 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
481
482 ContinuationPolicy::new(
483 true, true, is_grouped_safe,
486 )
487}
488
489#[must_use]
491#[cfg(test)]
492pub(in crate::db) fn project_planner_route_profile_for_model(
493 model: &EntityModel,
494 plan: &AccessPlannedQuery,
495) -> PlannerRouteProfile {
496 let primary_key_names = ordered_primary_key_names(model);
497 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
498 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
499 });
500
501 PlannerRouteProfile::new(
502 derive_continuation_policy_validated(plan),
503 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
504 secondary_order_contract,
505 )
506}
507
508#[must_use]
510pub(in crate::db) fn project_planner_route_profile_for_schema(
511 schema_info: &SchemaInfo,
512 plan: &AccessPlannedQuery,
513) -> PlannerRouteProfile {
514 let primary_key_names = primary_key_names_from_schema(schema_info);
515 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
516 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
517 });
518
519 PlannerRouteProfile::new(
520 derive_continuation_policy_validated(plan),
521 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
522 secondary_order_contract,
523 )
524}
525
526fn project_static_execution_planning_contract_for_model(
527 model: &EntityModel,
528 schema_info: &SchemaInfo,
529 plan: &AccessPlannedQuery,
530) -> Result<StaticExecutionPlanningContract, InternalError> {
531 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
532 let execution_preparation_predicate = plan.execution_preparation_predicate();
533 let residual_filter_predicate = derive_residual_filter_predicate(plan);
534 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
535 let execution_preparation_compiled_predicate =
536 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
537 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
538 schema_info,
539 residual_filter_expr.as_ref(),
540 residual_filter_predicate.as_ref(),
541 )?;
542 let scalar_projection_plan = if plan.grouped_plan().is_none() {
543 Some(
544 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
545 .ok_or_else(|| {
546 InternalError::query_executor_invariant(
547 "scalar projection program must compile during static planning finalization",
548 )
549 })?
550 .iter()
551 .map(CompiledExpr::compile)
552 .collect(),
553 )
554 } else {
555 None
556 };
557 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
558 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
559 let projection_direct_slots = lower_direct_projection_slots_with_schema(
560 model,
561 schema_info,
562 &plan.logical,
563 &plan.projection_selection,
564 );
565 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
566 model,
567 schema_info,
568 &plan.logical,
569 &plan.projection_selection,
570 );
571 let projection_referenced_slots =
572 projection_spec.referenced_slots_for_schema(model, schema_info)?;
573 let projected_slot_mask =
574 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
575 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
576 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
577 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
578 let slot_map = slot_map_for_schema_plan(schema_info, plan);
579 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
580
581 Ok(StaticExecutionPlanningContract {
582 primary_key_names: schema_info.primary_key_names().to_vec(),
583 projection_spec,
584 execution_preparation_predicate,
585 residual_filter_expr,
586 residual_filter_predicate,
587 execution_preparation_compiled_predicate,
588 effective_runtime_filter_program,
589 scalar_projection_plan,
590 grouped_aggregate_execution_specs,
591 grouped_distinct_execution_strategy,
592 projection_direct_slots,
593 projection_data_row_direct_slots,
594 projection_referenced_slots,
595 projected_slot_mask,
596 projection_is_model_identity,
597 resolved_order,
598 order_referenced_slots,
599 slot_map,
600 index_compile_targets,
601 })
602}
603
604#[cfg(test)]
605fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
606 model.primary_key_names()
607}
608
609fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
610 schema_info
611 .primary_key_names()
612 .iter()
613 .map(String::as_str)
614 .collect()
615}
616
617fn compile_effective_runtime_filter_program(
621 schema_info: &SchemaInfo,
622 residual_filter_expr: Option<&Expr>,
623 residual_filter_predicate: Option<&Predicate>,
624) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
625 if let Some(predicate) = residual_filter_predicate {
630 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
631 PredicateProgram::compile_with_schema_info(schema_info, predicate),
632 )));
633 }
634
635 if let Some(filter_expr) = residual_filter_expr {
636 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
637 .ok_or_else(|| {
638 InternalError::query_invalid_logical_plan(
639 "effective runtime scalar filter expression must compile during static planning finalization",
640 )
641 })?;
642
643 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
644 CompiledExpr::compile(&compiled),
645 )));
646 }
647
648 Ok(None)
649}
650
651fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
655 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
656
657 match plan.access.selected_index_contract() {
658 Some(index) => {
659 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
660 }
661 None => Some(query_predicate.clone()),
662 }
663}
664
665fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
669 let filtered_residual = derive_execution_preparation_predicate(plan);
670 let filtered_residual = filtered_residual.as_ref()?;
671
672 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
673}
674
675fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
679 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
680 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
681 return None;
682 }
683
684 Some(filter_expr.clone())
685}
686
687fn derive_residual_filter_expr_for_model(
691 model: &EntityModel,
692 plan: &AccessPlannedQuery,
693) -> Option<Expr> {
694 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
695 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
696 return None;
697 }
698
699 Some(filter_expr.clone())
700}
701
702fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
706 match (
707 plan.scalar_plan().filter_expr.as_ref(),
708 plan.scalar_plan().predicate.as_ref(),
709 ) {
710 (None, None) => false,
711 (Some(_), None) => true,
712 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
713 }
714}
715
716fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
719 plan.scalar_plan().predicate.is_some()
720 && derive_residual_filter_predicate(plan).is_none()
721 && derive_residual_filter_expr(plan).is_none()
722}
723
724const fn derive_semantic_filter_fully_satisfied_by_access_contract(
728 plan: &AccessPlannedQuery,
729) -> bool {
730 plan.scalar_plan().filter_expr.is_some()
731 && plan.scalar_plan().predicate.is_some()
732 && plan.scalar_plan().predicate_covers_filter_expr
733}
734
735const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
739 _model: &EntityModel,
740 plan: &AccessPlannedQuery,
741) -> bool {
742 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
743}
744
745fn compile_optional_predicate(
748 schema_info: &SchemaInfo,
749 predicate: Option<&Predicate>,
750) -> Option<PredicateProgram> {
751 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
752}
753
754fn resolve_grouped_static_planning_semantics(
758 schema_info: &SchemaInfo,
759 plan: &AccessPlannedQuery,
760 projection_spec: &ProjectionSpec,
761) -> Result<
762 (
763 Option<Vec<GroupedAggregateExecutionSpec>>,
764 Option<GroupedDistinctExecutionStrategy>,
765 ),
766 InternalError,
767> {
768 let Some(grouped) = plan.grouped_plan() else {
769 return Ok((None, None));
770 };
771
772 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
773 projection_spec,
774 grouped.group.group_fields.as_slice(),
775 grouped.group.aggregates.as_slice(),
776 )?;
777 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
778
779 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
780 schema_info,
781 aggregate_specs.as_slice(),
782 )?);
783 let grouped_distinct_execution_strategy = Some(
784 resolved_grouped_distinct_execution_strategy_with_schema_info(
785 schema_info,
786 grouped.group.group_fields.as_slice(),
787 grouped.group.aggregates.as_slice(),
788 grouped.having_expr.as_ref(),
789 )?,
790 );
791
792 Ok((
793 grouped_aggregate_execution_specs,
794 grouped_distinct_execution_strategy,
795 ))
796}
797
798fn extend_grouped_having_aggregate_specs(
799 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
800 grouped: &GroupPlan,
801) -> Result<(), InternalError> {
802 if let Some(having_expr) = grouped.having_expr.as_ref() {
803 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
804 }
805
806 Ok(())
807}
808
809fn collect_grouped_having_expr_aggregate_specs(
810 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
811 expr: &Expr,
812) -> Result<(), InternalError> {
813 if !expr.contains_aggregate() {
814 return Ok(());
815 }
816
817 expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
818 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
819
820 if aggregate_specs
821 .iter()
822 .all(|current| current != &aggregate_spec)
823 {
824 aggregate_specs.push(aggregate_spec);
825 }
826
827 Ok(())
828 })
829}
830
831fn projected_slot_mask_for_spec(
832 model: &EntityModel,
833 direct_projection_slots: Option<&[usize]>,
834) -> Vec<bool> {
835 let schema_slot_len = direct_projection_slots
836 .and_then(|slots| slots.iter().copied().max())
837 .map_or(0, |slot| slot.saturating_add(1));
838 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
839
840 let Some(direct_projection_slots) = direct_projection_slots else {
841 return projected_slots;
842 };
843
844 for slot in direct_projection_slots.iter().copied() {
845 if let Some(projected) = projected_slots.get_mut(slot) {
846 *projected = true;
847 }
848 }
849
850 projected_slots
851}
852
853fn resolved_order_for_plan(
854 schema_info: &SchemaInfo,
855 plan: &AccessPlannedQuery,
856) -> Result<Option<ResolvedOrder>, InternalError> {
857 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
858 return Ok(None);
859 }
860
861 let Some(order) = plan.scalar_plan().order.as_ref() else {
862 return Ok(None);
863 };
864
865 let mut fields = Vec::with_capacity(order.fields.len());
866 for term in &order.fields {
867 fields.push(ResolvedOrderField::new(
868 resolved_order_value_source_for_term(schema_info, term)?,
869 term.direction(),
870 ));
871 }
872
873 Ok(Some(ResolvedOrder::new(fields)))
874}
875
876fn resolved_order_value_source_for_term(
877 schema_info: &SchemaInfo,
878 term: &crate::db::query::plan::OrderTerm,
879) -> Result<ResolvedOrderValueSource, InternalError> {
880 if term.direct_field().is_none() {
881 let rendered = term.rendered_label();
882 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
883 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
884 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
885
886 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
887 &compiled,
888 )));
889 }
890
891 let field = term
892 .direct_field()
893 .expect("direct-field order branch should only execute for field-backed terms");
894 let slot = resolve_required_schema_slot(schema_info, field, || {
895 InternalError::query_invalid_logical_plan(format!(
896 "order expression references unknown field '{field}'",
897 ))
898 })?;
899
900 Ok(ResolvedOrderValueSource::direct_field(slot))
901}
902
903fn validate_resolved_order_expr_fields(
904 schema_info: &SchemaInfo,
905 expr: &Expr,
906 rendered: &str,
907) -> Result<(), InternalError> {
908 expr.try_for_each_tree_expr(&mut |node| match node {
909 Expr::Field(field_id) => {
910 resolve_required_schema_slot(schema_info, field_id.as_str(), || {
911 InternalError::query_invalid_logical_plan(format!(
912 "order expression references unknown field '{rendered}'",
913 ))
914 })
915 .map(|_| ())
916 }
917 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
918 #[cfg(test)]
919 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
920 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
921 _ => Ok(()),
922 })
923}
924
925fn resolve_required_schema_slot<F>(
929 schema_info: &SchemaInfo,
930 field: &str,
931 invalid_plan_error: F,
932) -> Result<usize, InternalError>
933where
934 F: FnOnce() -> InternalError,
935{
936 schema_info
937 .field_slot_index(field)
938 .ok_or_else(invalid_plan_error)
939}
940
941fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
944 InternalError::query_invalid_logical_plan(format!(
945 "order expression '{rendered}' did not stay on the scalar expression seam",
946 ))
947}
948
949fn order_referenced_slots_for_resolved_order(
954 resolved_order: Option<&ResolvedOrder>,
955) -> Option<Vec<usize>> {
956 Some(resolved_order?.referenced_slots())
957}
958
959fn slot_map_for_schema_plan(
960 schema_info: &SchemaInfo,
961 plan: &AccessPlannedQuery,
962) -> Option<Vec<usize>> {
963 let executable = plan.access.executable_contract();
964
965 resolved_index_slots_for_access_path(schema_info, &executable)
966}
967
968fn resolved_index_slots_for_access_path(
969 schema_info: &SchemaInfo,
970 access: &ExecutableAccessPlan<'_, crate::value::Value>,
971) -> Option<Vec<usize>> {
972 let path = access.as_path()?;
973 let path_facts = path.shape_facts();
974 let key_items = path_facts.index_key_items_for_slot_map()?;
975 let mut slots = Vec::new();
976
977 match key_items.key_items() {
978 SemanticIndexKeyItemsRef::Fields(fields) => {
979 slots.reserve(fields.len());
980 for field_name in fields {
981 let slot = schema_info.field_slot_index(field_name)?;
982 slots.push(slot);
983 }
984 }
985 SemanticIndexKeyItemsRef::Accepted(items) => {
986 slots.reserve(items.len());
987 for key_item in items {
988 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
989 slots.push(slot);
990 }
991 }
992 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
993 slots.reserve(fields.len());
994 for &field_name in fields {
995 let slot = schema_info.field_slot_index(field_name)?;
996 slots.push(slot);
997 }
998 }
999 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1000 slots.reserve(items.len());
1001 for key_item in items {
1002 let slot = schema_info.field_slot_index(key_item.field())?;
1003 slots.push(slot);
1004 }
1005 }
1006 }
1007
1008 Some(slots)
1009}
1010
1011fn index_compile_targets_for_schema_plan(
1012 schema_info: &SchemaInfo,
1013 plan: &AccessPlannedQuery,
1014) -> Option<Vec<IndexCompileTarget>> {
1015 let executable = plan.access.executable_contract();
1016 let path = executable.as_path()?;
1017 let key_items = path.shape_facts().index_key_items_for_slot_map()?;
1018 let mut targets = Vec::new();
1019
1020 match key_items.key_items() {
1021 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1022 return None;
1023 }
1024 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1025 for (component_index, &field_name) in fields.iter().enumerate() {
1026 let field_slot = schema_info.field_slot_index(field_name)?;
1027 targets.push(IndexCompileTarget {
1028 component_index,
1029 field_slot,
1030 key_item: IndexKeyItem::Field(field_name),
1031 });
1032 }
1033 }
1034 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1035 for (component_index, &key_item) in items.iter().enumerate() {
1036 let field_slot = schema_info.field_slot_index(key_item.field())?;
1037 targets.push(IndexCompileTarget {
1038 component_index,
1039 field_slot,
1040 key_item,
1041 });
1042 }
1043 }
1044 }
1045
1046 Some(targets)
1047}