1use crate::{
7 db::{
8 access::{AccessPlan, ExecutableAccessPlan},
9 predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10 query::plan::{
11 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14 LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15 ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16 derive_logical_pushdown_eligibility,
17 expr::{
18 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
19 compile_scalar_projection_plan_with_schema,
20 },
21 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22 grouped_cursor_policy_violation, grouped_plan_strategy,
23 lower_data_row_direct_projection_slots_with_schema,
24 lower_direct_projection_slots_with_schema, lower_projection_identity,
25 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
26 residual_query_predicate_after_filtered_access_contract,
27 resolved_grouped_distinct_execution_strategy_with_schema_info,
28 },
29 schema::SchemaInfo,
30 },
31 error::InternalError,
32 model::{entity::EntityModel, index::IndexKeyItemsRef},
33};
34
35impl QueryMode {
36 #[must_use]
38 pub const fn is_load(&self) -> bool {
39 match self {
40 Self::Load(_) => true,
41 Self::Delete(_) => false,
42 }
43 }
44
45 #[must_use]
47 pub const fn is_delete(&self) -> bool {
48 match self {
49 Self::Delete(_) => true,
50 Self::Load(_) => false,
51 }
52 }
53}
54
55impl LogicalPlan {
56 #[must_use]
58 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
59 match self {
60 Self::Scalar(plan) => plan,
61 Self::Grouped(plan) => &plan.scalar,
62 }
63 }
64
65 #[must_use]
67 #[cfg(test)]
68 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
69 match self {
70 Self::Scalar(plan) => plan,
71 Self::Grouped(plan) => &mut plan.scalar,
72 }
73 }
74
75 #[must_use]
77 #[cfg(test)]
78 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
79 self.scalar_semantics()
80 }
81
82 #[must_use]
84 #[cfg(test)]
85 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
86 self.scalar_semantics_mut()
87 }
88}
89
90impl AccessPlannedQuery {
91 #[must_use]
93 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
94 self.logical.scalar_semantics()
95 }
96
97 #[must_use]
100 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
101 self.scalar_plan().consistency
102 }
103
104 #[must_use]
106 #[cfg(test)]
107 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
108 self.logical.scalar_semantics_mut()
109 }
110
111 #[must_use]
113 #[cfg(test)]
114 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
115 self.scalar_plan()
116 }
117
118 #[must_use]
120 #[cfg(test)]
121 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
122 self.scalar_plan_mut()
123 }
124
125 #[must_use]
127 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
128 match &self.logical {
129 LogicalPlan::Scalar(_) => None,
130 LogicalPlan::Grouped(plan) => Some(plan),
131 }
132 }
133
134 #[must_use]
136 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
137 if let Some(static_shape) = &self.static_planning_shape {
138 return static_shape.projection_spec.clone();
139 }
140
141 lower_projection_intent(model, &self.logical, &self.projection_selection)
142 }
143
144 #[must_use]
146 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
147 lower_projection_identity(&self.logical, &self.projection_selection)
148 }
149
150 #[must_use]
156 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
157 if let Some(static_shape) = self.static_planning_shape.as_ref() {
158 return static_shape.execution_preparation_predicate.clone();
159 }
160
161 derive_execution_preparation_predicate(self)
162 }
163
164 #[must_use]
168 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
169 if let Some(static_shape) = self.static_planning_shape.as_ref() {
170 return static_shape.residual_filter_predicate.clone();
171 }
172
173 derive_residual_filter_predicate(self)
174 }
175
176 #[must_use]
179 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
180 self.effective_execution_predicate().is_some()
181 }
182
183 #[must_use]
186 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
187 if let Some(static_shape) = self.static_planning_shape.as_ref() {
188 return static_shape.residual_filter_expr.as_ref();
189 }
190
191 if !derive_has_residual_filter(self) {
192 return None;
193 }
194
195 self.scalar_plan().filter_expr.as_ref()
196 }
197
198 #[must_use]
201 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
202 self.residual_filter_expr().is_some()
203 }
204
205 #[must_use]
207 pub(in crate::db) const fn execution_preparation_compiled_predicate(
208 &self,
209 ) -> Option<&PredicateProgram> {
210 self.static_planning_shape()
211 .execution_preparation_compiled_predicate
212 .as_ref()
213 }
214
215 #[must_use]
217 pub(in crate::db) const fn effective_runtime_compiled_predicate(
218 &self,
219 ) -> Option<&PredicateProgram> {
220 match self
221 .static_planning_shape()
222 .effective_runtime_filter_program
223 .as_ref()
224 {
225 Some(program) => program.predicate_program(),
226 None => None,
227 }
228 }
229
230 #[cfg(test)]
232 #[must_use]
233 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
234 &self,
235 ) -> Option<&CompiledExpr> {
236 match self
237 .static_planning_shape()
238 .effective_runtime_filter_program
239 .as_ref()
240 {
241 Some(program) => program.expression_filter(),
242 None => None,
243 }
244 }
245
246 #[must_use]
248 pub(in crate::db) const fn effective_runtime_filter_program(
249 &self,
250 ) -> Option<&EffectiveRuntimeFilterProgram> {
251 self.static_planning_shape()
252 .effective_runtime_filter_program
253 .as_ref()
254 }
255
256 #[must_use]
258 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
259 if !self.scalar_plan().distinct {
260 return DistinctExecutionStrategy::None;
261 }
262
263 match distinct_runtime_dedup_strategy(&self.access) {
267 Some(strategy) => strategy,
268 None => DistinctExecutionStrategy::None,
269 }
270 }
271
272 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
274 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
275 }
276
277 #[cfg(test)]
279 pub(in crate::db) fn finalize_static_planning_shape_for_model_only(
280 &mut self,
281 model: &EntityModel,
282 ) -> Result<(), InternalError> {
283 self.finalize_static_planning_shape_for_model_with_schema(
284 model,
285 SchemaInfo::cached_for_generated_entity_model(model),
286 )
287 }
288
289 pub(in crate::db) fn finalize_static_planning_shape_for_model_with_schema(
291 &mut self,
292 model: &EntityModel,
293 schema_info: &SchemaInfo,
294 ) -> Result<(), InternalError> {
295 self.static_planning_shape = Some(project_static_planning_shape_for_model(
296 model,
297 schema_info,
298 self,
299 )?);
300
301 Ok(())
302 }
303
304 #[must_use]
306 pub(in crate::db) fn execution_shape_signature(
307 &self,
308 entity_path: &'static str,
309 ) -> ExecutionShapeSignature {
310 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
311 }
312
313 #[must_use]
316 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
317 if let Some(static_shape) = self.static_planning_shape.as_ref() {
318 return self.scalar_plan().predicate.is_some()
319 && static_shape.residual_filter_predicate.is_none()
320 && static_shape.residual_filter_expr.is_none();
321 }
322
323 derive_predicate_fully_satisfied_by_access_contract(self)
324 }
325
326 #[must_use]
328 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
329 self.static_planning_shape()
330 .scalar_projection_plan
331 .as_deref()
332 }
333
334 #[must_use]
336 pub(in crate::db) const fn has_static_planning_shape(&self) -> bool {
337 self.static_planning_shape.is_some()
338 }
339
340 #[must_use]
342 pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
343 self.static_planning_shape().primary_key_name
344 }
345
346 #[must_use]
348 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
349 self.static_planning_shape()
350 .projection_referenced_slots
351 .as_slice()
352 }
353
354 #[must_use]
356 #[cfg(any(test, feature = "diagnostics"))]
357 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
358 self.static_planning_shape().projected_slot_mask.as_slice()
359 }
360
361 #[must_use]
363 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
364 self.static_planning_shape().projection_is_model_identity
365 }
366
367 #[must_use]
369 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
370 self.static_planning_shape()
371 .order_referenced_slots
372 .as_deref()
373 }
374
375 #[must_use]
377 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
378 self.static_planning_shape().resolved_order.as_ref()
379 }
380
381 #[must_use]
383 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
384 self.static_planning_shape().slot_map.as_deref()
385 }
386
387 #[must_use]
389 pub(in crate::db) fn grouped_aggregate_execution_specs(
390 &self,
391 ) -> Option<&[GroupedAggregateExecutionSpec]> {
392 self.static_planning_shape()
393 .grouped_aggregate_execution_specs
394 .as_deref()
395 }
396
397 #[must_use]
399 pub(in crate::db) const fn grouped_distinct_execution_strategy(
400 &self,
401 ) -> Option<&GroupedDistinctExecutionStrategy> {
402 self.static_planning_shape()
403 .grouped_distinct_execution_strategy
404 .as_ref()
405 }
406
407 #[must_use]
409 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
410 &self.static_planning_shape().projection_spec
411 }
412
413 #[must_use]
415 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
416 self.static_planning_shape()
417 .projection_direct_slots
418 .as_deref()
419 }
420
421 #[must_use]
423 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
424 self.static_planning_shape()
425 .projection_data_row_direct_slots
426 .as_deref()
427 }
428
429 #[must_use]
431 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
432 self.static_planning_shape()
433 .index_compile_targets
434 .as_deref()
435 }
436
437 const fn static_planning_shape(&self) -> &StaticPlanningShape {
438 self.static_planning_shape
439 .as_ref()
440 .expect("access-planned queries must freeze static planning shape before execution")
441 }
442}
443
444fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
445 match access {
446 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
447 Some(DistinctExecutionStrategy::PreOrdered)
448 }
449 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
450 Some(DistinctExecutionStrategy::HashMaterialize)
451 }
452 AccessPlan::Path(_) => None,
453 }
454}
455
456fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
457 let is_grouped_safe = plan
458 .grouped_plan()
459 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
460
461 ContinuationPolicy::new(
462 true, true, is_grouped_safe,
465 )
466}
467
468#[must_use]
470pub(in crate::db) fn project_planner_route_profile_for_model(
471 model: &EntityModel,
472 plan: &AccessPlannedQuery,
473) -> PlannerRouteProfile {
474 let secondary_order_contract = plan
475 .scalar_plan()
476 .order
477 .as_ref()
478 .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
479
480 PlannerRouteProfile::new(
481 derive_continuation_policy_validated(plan),
482 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
483 secondary_order_contract,
484 )
485}
486
487fn project_static_planning_shape_for_model(
488 model: &EntityModel,
489 schema_info: &SchemaInfo,
490 plan: &AccessPlannedQuery,
491) -> Result<StaticPlanningShape, InternalError> {
492 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
493 let execution_preparation_predicate = plan.execution_preparation_predicate();
494 let residual_filter_predicate = derive_residual_filter_predicate(plan);
495 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
496 let execution_preparation_compiled_predicate =
497 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
498 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
499 schema_info,
500 residual_filter_expr.as_ref(),
501 residual_filter_predicate.as_ref(),
502 )?;
503 let scalar_projection_plan = if plan.grouped_plan().is_none() {
504 Some(
505 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
506 .ok_or_else(|| {
507 InternalError::query_executor_invariant(
508 "scalar projection program must compile during static planning finalization",
509 )
510 })?
511 .iter()
512 .map(CompiledExpr::compile)
513 .collect(),
514 )
515 } else {
516 None
517 };
518 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
519 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
520 let projection_direct_slots = lower_direct_projection_slots_with_schema(
521 model,
522 schema_info,
523 &plan.logical,
524 &plan.projection_selection,
525 );
526 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
527 model,
528 schema_info,
529 &plan.logical,
530 &plan.projection_selection,
531 );
532 let projection_referenced_slots =
533 projection_spec.referenced_slots_for_schema(model, schema_info)?;
534 let projected_slot_mask =
535 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
536 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
537 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
538 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
539 let slot_map = slot_map_for_schema_plan(schema_info, plan);
540 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
541
542 Ok(StaticPlanningShape {
543 primary_key_name: model.primary_key.name,
544 projection_spec,
545 execution_preparation_predicate,
546 residual_filter_expr,
547 residual_filter_predicate,
548 execution_preparation_compiled_predicate,
549 effective_runtime_filter_program,
550 scalar_projection_plan,
551 grouped_aggregate_execution_specs,
552 grouped_distinct_execution_strategy,
553 projection_direct_slots,
554 projection_data_row_direct_slots,
555 projection_referenced_slots,
556 projected_slot_mask,
557 projection_is_model_identity,
558 resolved_order,
559 order_referenced_slots,
560 slot_map,
561 index_compile_targets,
562 })
563}
564
565fn compile_effective_runtime_filter_program(
569 schema_info: &SchemaInfo,
570 residual_filter_expr: Option<&Expr>,
571 residual_filter_predicate: Option<&Predicate>,
572) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
573 if let Some(predicate) = residual_filter_predicate {
578 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
579 PredicateProgram::compile_with_schema_info(schema_info, predicate),
580 )));
581 }
582
583 if let Some(filter_expr) = residual_filter_expr {
584 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
585 .ok_or_else(|| {
586 InternalError::query_invalid_logical_plan(
587 "effective runtime scalar filter expression must compile during static planning finalization",
588 )
589 })?;
590
591 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
592 CompiledExpr::compile(&compiled),
593 )));
594 }
595
596 Ok(None)
597}
598
599fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
603 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
604
605 match plan.access.selected_index_contract() {
606 Some(index) => {
607 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
608 }
609 None => Some(query_predicate.clone()),
610 }
611}
612
613fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
617 let filtered_residual = derive_execution_preparation_predicate(plan);
618 let filtered_residual = filtered_residual.as_ref()?;
619
620 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
621}
622
623fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
627 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
628 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
629 return None;
630 }
631
632 Some(filter_expr.clone())
633}
634
635fn derive_residual_filter_expr_for_model(
639 model: &EntityModel,
640 plan: &AccessPlannedQuery,
641) -> Option<Expr> {
642 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
643 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
644 return None;
645 }
646
647 Some(filter_expr.clone())
648}
649
650fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
654 match (
655 plan.scalar_plan().filter_expr.as_ref(),
656 plan.scalar_plan().predicate.as_ref(),
657 ) {
658 (None, None) => false,
659 (Some(_), None) => true,
660 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
661 }
662}
663
664fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
667 plan.scalar_plan().predicate.is_some()
668 && derive_residual_filter_predicate(plan).is_none()
669 && derive_residual_filter_expr(plan).is_none()
670}
671
672const fn derive_semantic_filter_fully_satisfied_by_access_contract(
676 plan: &AccessPlannedQuery,
677) -> bool {
678 plan.scalar_plan().filter_expr.is_some()
679 && plan.scalar_plan().predicate.is_some()
680 && plan.scalar_plan().predicate_covers_filter_expr
681}
682
683const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
687 _model: &EntityModel,
688 plan: &AccessPlannedQuery,
689) -> bool {
690 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
691}
692
693fn compile_optional_predicate(
696 schema_info: &SchemaInfo,
697 predicate: Option<&Predicate>,
698) -> Option<PredicateProgram> {
699 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
700}
701
702fn resolve_grouped_static_planning_semantics(
706 schema_info: &SchemaInfo,
707 plan: &AccessPlannedQuery,
708 projection_spec: &ProjectionSpec,
709) -> Result<
710 (
711 Option<Vec<GroupedAggregateExecutionSpec>>,
712 Option<GroupedDistinctExecutionStrategy>,
713 ),
714 InternalError,
715> {
716 let Some(grouped) = plan.grouped_plan() else {
717 return Ok((None, None));
718 };
719
720 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
721 projection_spec,
722 grouped.group.group_fields.as_slice(),
723 grouped.group.aggregates.as_slice(),
724 )?;
725 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
726
727 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
728 schema_info,
729 aggregate_specs.as_slice(),
730 )?);
731 let grouped_distinct_execution_strategy = Some(
732 resolved_grouped_distinct_execution_strategy_with_schema_info(
733 schema_info,
734 grouped.group.group_fields.as_slice(),
735 grouped.group.aggregates.as_slice(),
736 grouped.having_expr.as_ref(),
737 )?,
738 );
739
740 Ok((
741 grouped_aggregate_execution_specs,
742 grouped_distinct_execution_strategy,
743 ))
744}
745
746fn extend_grouped_having_aggregate_specs(
747 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
748 grouped: &GroupPlan,
749) -> Result<(), InternalError> {
750 if let Some(having_expr) = grouped.having_expr.as_ref() {
751 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
752 }
753
754 Ok(())
755}
756
757fn collect_grouped_having_expr_aggregate_specs(
758 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
759 expr: &Expr,
760) -> Result<(), InternalError> {
761 if !expr.contains_aggregate() {
762 return Ok(());
763 }
764
765 expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
766 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
767
768 if aggregate_specs
769 .iter()
770 .all(|current| current != &aggregate_spec)
771 {
772 aggregate_specs.push(aggregate_spec);
773 }
774
775 Ok(())
776 })
777}
778
779fn projected_slot_mask_for_spec(
780 model: &EntityModel,
781 direct_projection_slots: Option<&[usize]>,
782) -> Vec<bool> {
783 let schema_slot_len = direct_projection_slots
784 .and_then(|slots| slots.iter().copied().max())
785 .map_or(0, |slot| slot.saturating_add(1));
786 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
787
788 let Some(direct_projection_slots) = direct_projection_slots else {
789 return projected_slots;
790 };
791
792 for slot in direct_projection_slots.iter().copied() {
793 if let Some(projected) = projected_slots.get_mut(slot) {
794 *projected = true;
795 }
796 }
797
798 projected_slots
799}
800
801fn resolved_order_for_plan(
802 schema_info: &SchemaInfo,
803 plan: &AccessPlannedQuery,
804) -> Result<Option<ResolvedOrder>, InternalError> {
805 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
806 return Ok(None);
807 }
808
809 let Some(order) = plan.scalar_plan().order.as_ref() else {
810 return Ok(None);
811 };
812
813 let mut fields = Vec::with_capacity(order.fields.len());
814 for term in &order.fields {
815 fields.push(ResolvedOrderField::new(
816 resolved_order_value_source_for_term(schema_info, term)?,
817 term.direction(),
818 ));
819 }
820
821 Ok(Some(ResolvedOrder::new(fields)))
822}
823
824fn resolved_order_value_source_for_term(
825 schema_info: &SchemaInfo,
826 term: &crate::db::query::plan::OrderTerm,
827) -> Result<ResolvedOrderValueSource, InternalError> {
828 if term.direct_field().is_none() {
829 let rendered = term.rendered_label();
830 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
831 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
832 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
833
834 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
835 &compiled,
836 )));
837 }
838
839 let field = term
840 .direct_field()
841 .expect("direct-field order branch should only execute for field-backed terms");
842 let slot = resolve_required_schema_slot(schema_info, field, || {
843 InternalError::query_invalid_logical_plan(format!(
844 "order expression references unknown field '{field}'",
845 ))
846 })?;
847
848 Ok(ResolvedOrderValueSource::direct_field(slot))
849}
850
851fn validate_resolved_order_expr_fields(
852 schema_info: &SchemaInfo,
853 expr: &Expr,
854 rendered: &str,
855) -> Result<(), InternalError> {
856 expr.try_for_each_tree_expr(&mut |node| match node {
857 Expr::Field(field_id) => {
858 resolve_required_schema_slot(schema_info, field_id.as_str(), || {
859 InternalError::query_invalid_logical_plan(format!(
860 "order expression references unknown field '{rendered}'",
861 ))
862 })
863 .map(|_| ())
864 }
865 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
866 #[cfg(test)]
867 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
868 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
869 _ => Ok(()),
870 })
871}
872
873fn resolve_required_schema_slot<F>(
877 schema_info: &SchemaInfo,
878 field: &str,
879 invalid_plan_error: F,
880) -> Result<usize, InternalError>
881where
882 F: FnOnce() -> InternalError,
883{
884 schema_info
885 .field_slot_index(field)
886 .ok_or_else(invalid_plan_error)
887}
888
889fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
892 InternalError::query_invalid_logical_plan(format!(
893 "order expression '{rendered}' did not stay on the scalar expression seam",
894 ))
895}
896
897fn order_referenced_slots_for_resolved_order(
902 resolved_order: Option<&ResolvedOrder>,
903) -> Option<Vec<usize>> {
904 Some(resolved_order?.referenced_slots())
905}
906
907fn slot_map_for_schema_plan(
908 schema_info: &SchemaInfo,
909 plan: &AccessPlannedQuery,
910) -> Option<Vec<usize>> {
911 let executable = plan.access.executable_contract();
912
913 resolved_index_slots_for_access_path(schema_info, &executable)
914}
915
916fn resolved_index_slots_for_access_path(
917 schema_info: &SchemaInfo,
918 access: &ExecutableAccessPlan<'_, crate::value::Value>,
919) -> Option<Vec<usize>> {
920 let path = access.as_path()?;
921 let path_capabilities = path.capabilities();
922 let key_items = path_capabilities.index_key_items_for_slot_map()?;
923 let mut slots = Vec::new();
924
925 match key_items {
926 IndexKeyItemsRef::Fields(fields) => {
927 slots.reserve(fields.len());
928 for &field_name in fields {
929 let slot = schema_info.field_slot_index(field_name)?;
930 slots.push(slot);
931 }
932 }
933 IndexKeyItemsRef::Items(items) => {
934 slots.reserve(items.len());
935 for key_item in items {
936 let slot = schema_info.field_slot_index(key_item.field())?;
937 slots.push(slot);
938 }
939 }
940 }
941
942 Some(slots)
943}
944
945fn index_compile_targets_for_schema_plan(
946 schema_info: &SchemaInfo,
947 plan: &AccessPlannedQuery,
948) -> Option<Vec<IndexCompileTarget>> {
949 let executable = plan.access.executable_contract();
950 let path = executable.as_path()?;
951 let key_items = path.capabilities().index_key_items_for_slot_map()?;
952 let mut targets = Vec::new();
953
954 match key_items {
955 IndexKeyItemsRef::Fields(fields) => {
956 for (component_index, &field_name) in fields.iter().enumerate() {
957 let field_slot = schema_info.field_slot_index(field_name)?;
958 targets.push(IndexCompileTarget {
959 component_index,
960 field_slot,
961 key_item: crate::model::index::IndexKeyItem::Field(field_name),
962 });
963 }
964 }
965 IndexKeyItemsRef::Items(items) => {
966 for (component_index, &key_item) in items.iter().enumerate() {
967 let field_slot = schema_info.field_slot_index(key_item.field())?;
968 targets.push(IndexCompileTarget {
969 component_index,
970 field_slot,
971 key_item,
972 });
973 }
974 }
975 }
976
977 Some(targets)
978}