1use crate::{
7 db::{
8 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
9 predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10 query::plan::{
11 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14 LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15 ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16 derive_logical_pushdown_eligibility,
17 expr::{
18 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
19 compile_scalar_projection_plan_with_schema,
20 },
21 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22 grouped_cursor_policy_violation, grouped_plan_strategy,
23 lower_data_row_direct_projection_slots_with_schema,
24 lower_direct_projection_slots_with_schema, lower_projection_identity,
25 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
26 residual_query_predicate_after_filtered_access_contract,
27 resolved_grouped_distinct_execution_strategy_with_schema_info,
28 },
29 schema::SchemaInfo,
30 },
31 error::InternalError,
32 model::{
33 entity::EntityModel,
34 index::{IndexKeyItem, IndexKeyItemsRef},
35 },
36};
37
38impl QueryMode {
39 #[must_use]
41 pub const fn is_load(&self) -> bool {
42 match self {
43 Self::Load(_) => true,
44 Self::Delete(_) => false,
45 }
46 }
47
48 #[must_use]
50 pub const fn is_delete(&self) -> bool {
51 match self {
52 Self::Delete(_) => true,
53 Self::Load(_) => false,
54 }
55 }
56}
57
58impl LogicalPlan {
59 #[must_use]
61 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
62 match self {
63 Self::Scalar(plan) => plan,
64 Self::Grouped(plan) => &plan.scalar,
65 }
66 }
67
68 #[must_use]
70 #[cfg(test)]
71 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
72 match self {
73 Self::Scalar(plan) => plan,
74 Self::Grouped(plan) => &mut plan.scalar,
75 }
76 }
77
78 #[must_use]
80 #[cfg(test)]
81 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
82 self.scalar_semantics()
83 }
84
85 #[must_use]
87 #[cfg(test)]
88 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
89 self.scalar_semantics_mut()
90 }
91}
92
93impl AccessPlannedQuery {
94 #[must_use]
96 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
97 self.logical.scalar_semantics()
98 }
99
100 #[must_use]
103 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
104 self.scalar_plan().consistency
105 }
106
107 #[must_use]
109 #[cfg(test)]
110 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
111 self.logical.scalar_semantics_mut()
112 }
113
114 #[must_use]
116 #[cfg(test)]
117 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
118 self.scalar_plan()
119 }
120
121 #[must_use]
123 #[cfg(test)]
124 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
125 self.scalar_plan_mut()
126 }
127
128 #[must_use]
130 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
131 match &self.logical {
132 LogicalPlan::Scalar(_) => None,
133 LogicalPlan::Grouped(plan) => Some(plan),
134 }
135 }
136
137 #[must_use]
139 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
140 if let Some(static_shape) = &self.static_planning_shape {
141 return static_shape.projection_spec.clone();
142 }
143
144 lower_projection_intent(model, &self.logical, &self.projection_selection)
145 }
146
147 #[must_use]
149 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
150 lower_projection_identity(&self.logical, &self.projection_selection)
151 }
152
153 #[must_use]
159 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
160 if let Some(static_shape) = self.static_planning_shape.as_ref() {
161 return static_shape.execution_preparation_predicate.clone();
162 }
163
164 derive_execution_preparation_predicate(self)
165 }
166
167 #[must_use]
171 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
172 if let Some(static_shape) = self.static_planning_shape.as_ref() {
173 return static_shape.residual_filter_predicate.clone();
174 }
175
176 derive_residual_filter_predicate(self)
177 }
178
179 #[must_use]
182 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
183 self.effective_execution_predicate().is_some()
184 }
185
186 #[must_use]
189 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
190 if let Some(static_shape) = self.static_planning_shape.as_ref() {
191 return static_shape.residual_filter_expr.as_ref();
192 }
193
194 if !derive_has_residual_filter(self) {
195 return None;
196 }
197
198 self.scalar_plan().filter_expr.as_ref()
199 }
200
201 #[must_use]
204 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
205 self.residual_filter_expr().is_some()
206 }
207
208 #[must_use]
210 pub(in crate::db) const fn execution_preparation_compiled_predicate(
211 &self,
212 ) -> Option<&PredicateProgram> {
213 self.static_planning_shape()
214 .execution_preparation_compiled_predicate
215 .as_ref()
216 }
217
218 #[must_use]
220 pub(in crate::db) const fn effective_runtime_compiled_predicate(
221 &self,
222 ) -> Option<&PredicateProgram> {
223 match self
224 .static_planning_shape()
225 .effective_runtime_filter_program
226 .as_ref()
227 {
228 Some(program) => program.predicate_program(),
229 None => None,
230 }
231 }
232
233 #[cfg(test)]
235 #[must_use]
236 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
237 &self,
238 ) -> Option<&CompiledExpr> {
239 match self
240 .static_planning_shape()
241 .effective_runtime_filter_program
242 .as_ref()
243 {
244 Some(program) => program.expression_filter(),
245 None => None,
246 }
247 }
248
249 #[must_use]
251 pub(in crate::db) const fn effective_runtime_filter_program(
252 &self,
253 ) -> Option<&EffectiveRuntimeFilterProgram> {
254 self.static_planning_shape()
255 .effective_runtime_filter_program
256 .as_ref()
257 }
258
259 #[must_use]
261 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
262 if !self.scalar_plan().distinct {
263 return DistinctExecutionStrategy::None;
264 }
265
266 match distinct_runtime_dedup_strategy(&self.access) {
270 Some(strategy) => strategy,
271 None => DistinctExecutionStrategy::None,
272 }
273 }
274
275 #[cfg(test)]
277 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
278 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
279 }
280
281 pub(in crate::db) fn finalize_planner_route_profile_for_model_with_schema(
283 &mut self,
284 schema_info: &SchemaInfo,
285 ) {
286 self.set_planner_route_profile(project_planner_route_profile_for_schema(schema_info, self));
287 }
288
289 #[cfg(test)]
291 pub(in crate::db) fn finalize_static_planning_shape_for_model_only(
292 &mut self,
293 model: &EntityModel,
294 ) -> Result<(), InternalError> {
295 self.finalize_static_planning_shape_for_model_with_schema(
296 model,
297 SchemaInfo::cached_for_generated_entity_model(model),
298 )
299 }
300
301 pub(in crate::db) fn finalize_static_planning_shape_for_model_with_schema(
303 &mut self,
304 model: &EntityModel,
305 schema_info: &SchemaInfo,
306 ) -> Result<(), InternalError> {
307 self.static_planning_shape = Some(project_static_planning_shape_for_model(
308 model,
309 schema_info,
310 self,
311 )?);
312
313 Ok(())
314 }
315
316 #[must_use]
318 pub(in crate::db) fn execution_shape_signature(
319 &self,
320 entity_path: &'static str,
321 ) -> ExecutionShapeSignature {
322 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
323 }
324
325 #[must_use]
328 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
329 if let Some(static_shape) = self.static_planning_shape.as_ref() {
330 return self.scalar_plan().predicate.is_some()
331 && static_shape.residual_filter_predicate.is_none()
332 && static_shape.residual_filter_expr.is_none();
333 }
334
335 derive_predicate_fully_satisfied_by_access_contract(self)
336 }
337
338 #[must_use]
340 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
341 self.static_planning_shape()
342 .scalar_projection_plan
343 .as_deref()
344 }
345
346 #[must_use]
348 pub(in crate::db) const fn has_static_planning_shape(&self) -> bool {
349 self.static_planning_shape.is_some()
350 }
351
352 #[must_use]
354 pub(in crate::db) fn primary_key_names(&self) -> Vec<&str> {
355 self.static_planning_shape()
356 .primary_key_names
357 .iter()
358 .map(String::as_str)
359 .collect()
360 }
361
362 #[must_use]
364 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
365 self.static_planning_shape()
366 .projection_referenced_slots
367 .as_slice()
368 }
369
370 #[must_use]
372 #[cfg(any(test, feature = "diagnostics"))]
373 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
374 self.static_planning_shape().projected_slot_mask.as_slice()
375 }
376
377 #[must_use]
379 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
380 self.static_planning_shape().projection_is_model_identity
381 }
382
383 #[must_use]
385 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
386 self.static_planning_shape()
387 .order_referenced_slots
388 .as_deref()
389 }
390
391 #[must_use]
393 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
394 self.static_planning_shape().resolved_order.as_ref()
395 }
396
397 #[must_use]
399 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
400 self.static_planning_shape().slot_map.as_deref()
401 }
402
403 #[must_use]
405 pub(in crate::db) fn grouped_aggregate_execution_specs(
406 &self,
407 ) -> Option<&[GroupedAggregateExecutionSpec]> {
408 self.static_planning_shape()
409 .grouped_aggregate_execution_specs
410 .as_deref()
411 }
412
413 #[must_use]
415 pub(in crate::db) const fn grouped_distinct_execution_strategy(
416 &self,
417 ) -> Option<&GroupedDistinctExecutionStrategy> {
418 self.static_planning_shape()
419 .grouped_distinct_execution_strategy
420 .as_ref()
421 }
422
423 #[must_use]
425 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
426 &self.static_planning_shape().projection_spec
427 }
428
429 #[must_use]
431 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
432 self.static_planning_shape()
433 .projection_direct_slots
434 .as_deref()
435 }
436
437 #[must_use]
439 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
440 self.static_planning_shape()
441 .projection_data_row_direct_slots
442 .as_deref()
443 }
444
445 #[must_use]
447 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
448 self.static_planning_shape()
449 .index_compile_targets
450 .as_deref()
451 }
452
453 const fn static_planning_shape(&self) -> &StaticPlanningShape {
454 self.static_planning_shape
455 .as_ref()
456 .expect("access-planned queries must freeze static planning shape before execution")
457 }
458}
459
460fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
461 match access {
462 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
463 Some(DistinctExecutionStrategy::PreOrdered)
464 }
465 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
466 Some(DistinctExecutionStrategy::HashMaterialize)
467 }
468 AccessPlan::Path(_) => None,
469 }
470}
471
472fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
473 let is_grouped_safe = plan
474 .grouped_plan()
475 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
476
477 ContinuationPolicy::new(
478 true, true, is_grouped_safe,
481 )
482}
483
484#[must_use]
486#[cfg(test)]
487pub(in crate::db) fn project_planner_route_profile_for_model(
488 model: &EntityModel,
489 plan: &AccessPlannedQuery,
490) -> PlannerRouteProfile {
491 let primary_key_names = ordered_primary_key_names(model);
492 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
493 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
494 });
495
496 PlannerRouteProfile::new(
497 derive_continuation_policy_validated(plan),
498 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
499 secondary_order_contract,
500 )
501}
502
503#[must_use]
505pub(in crate::db) fn project_planner_route_profile_for_schema(
506 schema_info: &SchemaInfo,
507 plan: &AccessPlannedQuery,
508) -> PlannerRouteProfile {
509 let primary_key_names = primary_key_names_from_schema(schema_info);
510 let secondary_order_contract = plan.scalar_plan().order.as_ref().and_then(|order| {
511 order.deterministic_secondary_order_contract_fields(primary_key_names.as_slice())
512 });
513
514 PlannerRouteProfile::new(
515 derive_continuation_policy_validated(plan),
516 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
517 secondary_order_contract,
518 )
519}
520
521fn project_static_planning_shape_for_model(
522 model: &EntityModel,
523 schema_info: &SchemaInfo,
524 plan: &AccessPlannedQuery,
525) -> Result<StaticPlanningShape, InternalError> {
526 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
527 let execution_preparation_predicate = plan.execution_preparation_predicate();
528 let residual_filter_predicate = derive_residual_filter_predicate(plan);
529 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
530 let execution_preparation_compiled_predicate =
531 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
532 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
533 schema_info,
534 residual_filter_expr.as_ref(),
535 residual_filter_predicate.as_ref(),
536 )?;
537 let scalar_projection_plan = if plan.grouped_plan().is_none() {
538 Some(
539 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
540 .ok_or_else(|| {
541 InternalError::query_executor_invariant(
542 "scalar projection program must compile during static planning finalization",
543 )
544 })?
545 .iter()
546 .map(CompiledExpr::compile)
547 .collect(),
548 )
549 } else {
550 None
551 };
552 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
553 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
554 let projection_direct_slots = lower_direct_projection_slots_with_schema(
555 model,
556 schema_info,
557 &plan.logical,
558 &plan.projection_selection,
559 );
560 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
561 model,
562 schema_info,
563 &plan.logical,
564 &plan.projection_selection,
565 );
566 let projection_referenced_slots =
567 projection_spec.referenced_slots_for_schema(model, schema_info)?;
568 let projected_slot_mask =
569 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
570 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
571 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
572 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
573 let slot_map = slot_map_for_schema_plan(schema_info, plan);
574 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
575
576 Ok(StaticPlanningShape {
577 primary_key_names: schema_info.primary_key_names().to_vec(),
578 projection_spec,
579 execution_preparation_predicate,
580 residual_filter_expr,
581 residual_filter_predicate,
582 execution_preparation_compiled_predicate,
583 effective_runtime_filter_program,
584 scalar_projection_plan,
585 grouped_aggregate_execution_specs,
586 grouped_distinct_execution_strategy,
587 projection_direct_slots,
588 projection_data_row_direct_slots,
589 projection_referenced_slots,
590 projected_slot_mask,
591 projection_is_model_identity,
592 resolved_order,
593 order_referenced_slots,
594 slot_map,
595 index_compile_targets,
596 })
597}
598
599#[cfg(test)]
600fn ordered_primary_key_names(model: &EntityModel) -> Vec<&'static str> {
601 model.primary_key_names()
602}
603
604fn primary_key_names_from_schema(schema_info: &SchemaInfo) -> Vec<&str> {
605 schema_info
606 .primary_key_names()
607 .iter()
608 .map(String::as_str)
609 .collect()
610}
611
612fn compile_effective_runtime_filter_program(
616 schema_info: &SchemaInfo,
617 residual_filter_expr: Option<&Expr>,
618 residual_filter_predicate: Option<&Predicate>,
619) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
620 if let Some(predicate) = residual_filter_predicate {
625 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
626 PredicateProgram::compile_with_schema_info(schema_info, predicate),
627 )));
628 }
629
630 if let Some(filter_expr) = residual_filter_expr {
631 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
632 .ok_or_else(|| {
633 InternalError::query_invalid_logical_plan(
634 "effective runtime scalar filter expression must compile during static planning finalization",
635 )
636 })?;
637
638 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
639 CompiledExpr::compile(&compiled),
640 )));
641 }
642
643 Ok(None)
644}
645
646fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
650 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
651
652 match plan.access.selected_index_contract() {
653 Some(index) => {
654 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
655 }
656 None => Some(query_predicate.clone()),
657 }
658}
659
660fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
664 let filtered_residual = derive_execution_preparation_predicate(plan);
665 let filtered_residual = filtered_residual.as_ref()?;
666
667 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
668}
669
670fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
674 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
675 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
676 return None;
677 }
678
679 Some(filter_expr.clone())
680}
681
682fn derive_residual_filter_expr_for_model(
686 model: &EntityModel,
687 plan: &AccessPlannedQuery,
688) -> Option<Expr> {
689 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
690 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
691 return None;
692 }
693
694 Some(filter_expr.clone())
695}
696
697fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
701 match (
702 plan.scalar_plan().filter_expr.as_ref(),
703 plan.scalar_plan().predicate.as_ref(),
704 ) {
705 (None, None) => false,
706 (Some(_), None) => true,
707 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
708 }
709}
710
711fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
714 plan.scalar_plan().predicate.is_some()
715 && derive_residual_filter_predicate(plan).is_none()
716 && derive_residual_filter_expr(plan).is_none()
717}
718
719const fn derive_semantic_filter_fully_satisfied_by_access_contract(
723 plan: &AccessPlannedQuery,
724) -> bool {
725 plan.scalar_plan().filter_expr.is_some()
726 && plan.scalar_plan().predicate.is_some()
727 && plan.scalar_plan().predicate_covers_filter_expr
728}
729
730const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
734 _model: &EntityModel,
735 plan: &AccessPlannedQuery,
736) -> bool {
737 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
738}
739
740fn compile_optional_predicate(
743 schema_info: &SchemaInfo,
744 predicate: Option<&Predicate>,
745) -> Option<PredicateProgram> {
746 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
747}
748
749fn resolve_grouped_static_planning_semantics(
753 schema_info: &SchemaInfo,
754 plan: &AccessPlannedQuery,
755 projection_spec: &ProjectionSpec,
756) -> Result<
757 (
758 Option<Vec<GroupedAggregateExecutionSpec>>,
759 Option<GroupedDistinctExecutionStrategy>,
760 ),
761 InternalError,
762> {
763 let Some(grouped) = plan.grouped_plan() else {
764 return Ok((None, None));
765 };
766
767 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
768 projection_spec,
769 grouped.group.group_fields.as_slice(),
770 grouped.group.aggregates.as_slice(),
771 )?;
772 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
773
774 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
775 schema_info,
776 aggregate_specs.as_slice(),
777 )?);
778 let grouped_distinct_execution_strategy = Some(
779 resolved_grouped_distinct_execution_strategy_with_schema_info(
780 schema_info,
781 grouped.group.group_fields.as_slice(),
782 grouped.group.aggregates.as_slice(),
783 grouped.having_expr.as_ref(),
784 )?,
785 );
786
787 Ok((
788 grouped_aggregate_execution_specs,
789 grouped_distinct_execution_strategy,
790 ))
791}
792
793fn extend_grouped_having_aggregate_specs(
794 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
795 grouped: &GroupPlan,
796) -> Result<(), InternalError> {
797 if let Some(having_expr) = grouped.having_expr.as_ref() {
798 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
799 }
800
801 Ok(())
802}
803
804fn collect_grouped_having_expr_aggregate_specs(
805 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
806 expr: &Expr,
807) -> Result<(), InternalError> {
808 if !expr.contains_aggregate() {
809 return Ok(());
810 }
811
812 expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
813 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
814
815 if aggregate_specs
816 .iter()
817 .all(|current| current != &aggregate_spec)
818 {
819 aggregate_specs.push(aggregate_spec);
820 }
821
822 Ok(())
823 })
824}
825
826fn projected_slot_mask_for_spec(
827 model: &EntityModel,
828 direct_projection_slots: Option<&[usize]>,
829) -> Vec<bool> {
830 let schema_slot_len = direct_projection_slots
831 .and_then(|slots| slots.iter().copied().max())
832 .map_or(0, |slot| slot.saturating_add(1));
833 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
834
835 let Some(direct_projection_slots) = direct_projection_slots else {
836 return projected_slots;
837 };
838
839 for slot in direct_projection_slots.iter().copied() {
840 if let Some(projected) = projected_slots.get_mut(slot) {
841 *projected = true;
842 }
843 }
844
845 projected_slots
846}
847
848fn resolved_order_for_plan(
849 schema_info: &SchemaInfo,
850 plan: &AccessPlannedQuery,
851) -> Result<Option<ResolvedOrder>, InternalError> {
852 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
853 return Ok(None);
854 }
855
856 let Some(order) = plan.scalar_plan().order.as_ref() else {
857 return Ok(None);
858 };
859
860 let mut fields = Vec::with_capacity(order.fields.len());
861 for term in &order.fields {
862 fields.push(ResolvedOrderField::new(
863 resolved_order_value_source_for_term(schema_info, term)?,
864 term.direction(),
865 ));
866 }
867
868 Ok(Some(ResolvedOrder::new(fields)))
869}
870
871fn resolved_order_value_source_for_term(
872 schema_info: &SchemaInfo,
873 term: &crate::db::query::plan::OrderTerm,
874) -> Result<ResolvedOrderValueSource, InternalError> {
875 if term.direct_field().is_none() {
876 let rendered = term.rendered_label();
877 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
878 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
879 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
880
881 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
882 &compiled,
883 )));
884 }
885
886 let field = term
887 .direct_field()
888 .expect("direct-field order branch should only execute for field-backed terms");
889 let slot = resolve_required_schema_slot(schema_info, field, || {
890 InternalError::query_invalid_logical_plan(format!(
891 "order expression references unknown field '{field}'",
892 ))
893 })?;
894
895 Ok(ResolvedOrderValueSource::direct_field(slot))
896}
897
898fn validate_resolved_order_expr_fields(
899 schema_info: &SchemaInfo,
900 expr: &Expr,
901 rendered: &str,
902) -> Result<(), InternalError> {
903 expr.try_for_each_tree_expr(&mut |node| match node {
904 Expr::Field(field_id) => {
905 resolve_required_schema_slot(schema_info, field_id.as_str(), || {
906 InternalError::query_invalid_logical_plan(format!(
907 "order expression references unknown field '{rendered}'",
908 ))
909 })
910 .map(|_| ())
911 }
912 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
913 #[cfg(test)]
914 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
915 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
916 _ => Ok(()),
917 })
918}
919
920fn resolve_required_schema_slot<F>(
924 schema_info: &SchemaInfo,
925 field: &str,
926 invalid_plan_error: F,
927) -> Result<usize, InternalError>
928where
929 F: FnOnce() -> InternalError,
930{
931 schema_info
932 .field_slot_index(field)
933 .ok_or_else(invalid_plan_error)
934}
935
936fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
939 InternalError::query_invalid_logical_plan(format!(
940 "order expression '{rendered}' did not stay on the scalar expression seam",
941 ))
942}
943
944fn order_referenced_slots_for_resolved_order(
949 resolved_order: Option<&ResolvedOrder>,
950) -> Option<Vec<usize>> {
951 Some(resolved_order?.referenced_slots())
952}
953
954fn slot_map_for_schema_plan(
955 schema_info: &SchemaInfo,
956 plan: &AccessPlannedQuery,
957) -> Option<Vec<usize>> {
958 let executable = plan.access.executable_contract();
959
960 resolved_index_slots_for_access_path(schema_info, &executable)
961}
962
963fn resolved_index_slots_for_access_path(
964 schema_info: &SchemaInfo,
965 access: &ExecutableAccessPlan<'_, crate::value::Value>,
966) -> Option<Vec<usize>> {
967 let path = access.as_path()?;
968 let path_capabilities = path.capabilities();
969 let key_items = path_capabilities.index_key_items_for_slot_map()?;
970 let mut slots = Vec::new();
971
972 match key_items.key_items() {
973 SemanticIndexKeyItemsRef::Fields(fields) => {
974 slots.reserve(fields.len());
975 for field_name in fields {
976 let slot = schema_info.field_slot_index(field_name)?;
977 slots.push(slot);
978 }
979 }
980 SemanticIndexKeyItemsRef::Accepted(items) => {
981 slots.reserve(items.len());
982 for key_item in items {
983 let slot = schema_info.field_slot_index(key_item.as_ref().field())?;
984 slots.push(slot);
985 }
986 }
987 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
988 slots.reserve(fields.len());
989 for &field_name in fields {
990 let slot = schema_info.field_slot_index(field_name)?;
991 slots.push(slot);
992 }
993 }
994 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
995 slots.reserve(items.len());
996 for key_item in items {
997 let slot = schema_info.field_slot_index(key_item.field())?;
998 slots.push(slot);
999 }
1000 }
1001 }
1002
1003 Some(slots)
1004}
1005
1006fn index_compile_targets_for_schema_plan(
1007 schema_info: &SchemaInfo,
1008 plan: &AccessPlannedQuery,
1009) -> Option<Vec<IndexCompileTarget>> {
1010 let executable = plan.access.executable_contract();
1011 let path = executable.as_path()?;
1012 let key_items = path.capabilities().index_key_items_for_slot_map()?;
1013 let mut targets = Vec::new();
1014
1015 match key_items.key_items() {
1016 SemanticIndexKeyItemsRef::Fields(_) | SemanticIndexKeyItemsRef::Accepted(_) => {
1017 return None;
1018 }
1019 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
1020 for (component_index, &field_name) in fields.iter().enumerate() {
1021 let field_slot = schema_info.field_slot_index(field_name)?;
1022 targets.push(IndexCompileTarget {
1023 component_index,
1024 field_slot,
1025 key_item: IndexKeyItem::Field(field_name),
1026 });
1027 }
1028 }
1029 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
1030 for (component_index, &key_item) in items.iter().enumerate() {
1031 let field_slot = schema_info.field_slot_index(key_item.field())?;
1032 targets.push(IndexCompileTarget {
1033 component_index,
1034 field_slot,
1035 key_item,
1036 });
1037 }
1038 }
1039 }
1040
1041 Some(targets)
1042}