1use crate::{
7 db::{
8 access::{AccessPlan, ExecutableAccessPlan, SemanticIndexKeyItemsRef},
9 predicate::{IndexCompileTarget, MissingRowPolicy, Predicate, PredicateProgram},
10 query::plan::{
11 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14 LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15 ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16 derive_logical_pushdown_eligibility,
17 expr::{
18 CompiledExpr, Expr, ProjectionSpec, compile_scalar_projection_expr_with_schema,
19 compile_scalar_projection_plan_with_schema,
20 },
21 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
22 grouped_cursor_policy_violation, grouped_plan_strategy,
23 lower_data_row_direct_projection_slots_with_schema,
24 lower_direct_projection_slots_with_schema, lower_projection_identity,
25 lower_projection_intent, residual_query_predicate_after_access_path_bounds,
26 residual_query_predicate_after_filtered_access_contract,
27 resolved_grouped_distinct_execution_strategy_with_schema_info,
28 },
29 schema::SchemaInfo,
30 },
31 error::InternalError,
32 model::{
33 entity::EntityModel,
34 index::{IndexKeyItem, IndexKeyItemsRef},
35 },
36};
37
38impl QueryMode {
39 #[must_use]
41 pub const fn is_load(&self) -> bool {
42 match self {
43 Self::Load(_) => true,
44 Self::Delete(_) => false,
45 }
46 }
47
48 #[must_use]
50 pub const fn is_delete(&self) -> bool {
51 match self {
52 Self::Delete(_) => true,
53 Self::Load(_) => false,
54 }
55 }
56}
57
58impl LogicalPlan {
59 #[must_use]
61 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
62 match self {
63 Self::Scalar(plan) => plan,
64 Self::Grouped(plan) => &plan.scalar,
65 }
66 }
67
68 #[must_use]
70 #[cfg(test)]
71 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
72 match self {
73 Self::Scalar(plan) => plan,
74 Self::Grouped(plan) => &mut plan.scalar,
75 }
76 }
77
78 #[must_use]
80 #[cfg(test)]
81 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
82 self.scalar_semantics()
83 }
84
85 #[must_use]
87 #[cfg(test)]
88 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
89 self.scalar_semantics_mut()
90 }
91}
92
93impl AccessPlannedQuery {
94 #[must_use]
96 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
97 self.logical.scalar_semantics()
98 }
99
100 #[must_use]
103 pub(in crate::db) const fn scalar_consistency(&self) -> MissingRowPolicy {
104 self.scalar_plan().consistency
105 }
106
107 #[must_use]
109 #[cfg(test)]
110 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
111 self.logical.scalar_semantics_mut()
112 }
113
114 #[must_use]
116 #[cfg(test)]
117 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
118 self.scalar_plan()
119 }
120
121 #[must_use]
123 #[cfg(test)]
124 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
125 self.scalar_plan_mut()
126 }
127
128 #[must_use]
130 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
131 match &self.logical {
132 LogicalPlan::Scalar(_) => None,
133 LogicalPlan::Grouped(plan) => Some(plan),
134 }
135 }
136
137 #[must_use]
139 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
140 if let Some(static_shape) = &self.static_planning_shape {
141 return static_shape.projection_spec.clone();
142 }
143
144 lower_projection_intent(model, &self.logical, &self.projection_selection)
145 }
146
147 #[must_use]
149 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
150 lower_projection_identity(&self.logical, &self.projection_selection)
151 }
152
153 #[must_use]
159 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
160 if let Some(static_shape) = self.static_planning_shape.as_ref() {
161 return static_shape.execution_preparation_predicate.clone();
162 }
163
164 derive_execution_preparation_predicate(self)
165 }
166
167 #[must_use]
171 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
172 if let Some(static_shape) = self.static_planning_shape.as_ref() {
173 return static_shape.residual_filter_predicate.clone();
174 }
175
176 derive_residual_filter_predicate(self)
177 }
178
179 #[must_use]
182 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
183 self.effective_execution_predicate().is_some()
184 }
185
186 #[must_use]
189 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
190 if let Some(static_shape) = self.static_planning_shape.as_ref() {
191 return static_shape.residual_filter_expr.as_ref();
192 }
193
194 if !derive_has_residual_filter(self) {
195 return None;
196 }
197
198 self.scalar_plan().filter_expr.as_ref()
199 }
200
201 #[must_use]
204 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
205 self.residual_filter_expr().is_some()
206 }
207
208 #[must_use]
210 pub(in crate::db) const fn execution_preparation_compiled_predicate(
211 &self,
212 ) -> Option<&PredicateProgram> {
213 self.static_planning_shape()
214 .execution_preparation_compiled_predicate
215 .as_ref()
216 }
217
218 #[must_use]
220 pub(in crate::db) const fn effective_runtime_compiled_predicate(
221 &self,
222 ) -> Option<&PredicateProgram> {
223 match self
224 .static_planning_shape()
225 .effective_runtime_filter_program
226 .as_ref()
227 {
228 Some(program) => program.predicate_program(),
229 None => None,
230 }
231 }
232
233 #[cfg(test)]
235 #[must_use]
236 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
237 &self,
238 ) -> Option<&CompiledExpr> {
239 match self
240 .static_planning_shape()
241 .effective_runtime_filter_program
242 .as_ref()
243 {
244 Some(program) => program.expression_filter(),
245 None => None,
246 }
247 }
248
249 #[must_use]
251 pub(in crate::db) const fn effective_runtime_filter_program(
252 &self,
253 ) -> Option<&EffectiveRuntimeFilterProgram> {
254 self.static_planning_shape()
255 .effective_runtime_filter_program
256 .as_ref()
257 }
258
259 #[must_use]
261 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
262 if !self.scalar_plan().distinct {
263 return DistinctExecutionStrategy::None;
264 }
265
266 match distinct_runtime_dedup_strategy(&self.access) {
270 Some(strategy) => strategy,
271 None => DistinctExecutionStrategy::None,
272 }
273 }
274
275 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
277 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
278 }
279
280 #[cfg(test)]
282 pub(in crate::db) fn finalize_static_planning_shape_for_model_only(
283 &mut self,
284 model: &EntityModel,
285 ) -> Result<(), InternalError> {
286 self.finalize_static_planning_shape_for_model_with_schema(
287 model,
288 SchemaInfo::cached_for_generated_entity_model(model),
289 )
290 }
291
292 pub(in crate::db) fn finalize_static_planning_shape_for_model_with_schema(
294 &mut self,
295 model: &EntityModel,
296 schema_info: &SchemaInfo,
297 ) -> Result<(), InternalError> {
298 self.static_planning_shape = Some(project_static_planning_shape_for_model(
299 model,
300 schema_info,
301 self,
302 )?);
303
304 Ok(())
305 }
306
307 #[must_use]
309 pub(in crate::db) fn execution_shape_signature(
310 &self,
311 entity_path: &'static str,
312 ) -> ExecutionShapeSignature {
313 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
314 }
315
316 #[must_use]
319 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
320 if let Some(static_shape) = self.static_planning_shape.as_ref() {
321 return self.scalar_plan().predicate.is_some()
322 && static_shape.residual_filter_predicate.is_none()
323 && static_shape.residual_filter_expr.is_none();
324 }
325
326 derive_predicate_fully_satisfied_by_access_contract(self)
327 }
328
329 #[must_use]
331 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[CompiledExpr]> {
332 self.static_planning_shape()
333 .scalar_projection_plan
334 .as_deref()
335 }
336
337 #[must_use]
339 pub(in crate::db) const fn has_static_planning_shape(&self) -> bool {
340 self.static_planning_shape.is_some()
341 }
342
343 #[must_use]
345 pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
346 self.static_planning_shape().primary_key_name
347 }
348
349 #[must_use]
351 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
352 self.static_planning_shape()
353 .projection_referenced_slots
354 .as_slice()
355 }
356
357 #[must_use]
359 #[cfg(any(test, feature = "diagnostics"))]
360 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
361 self.static_planning_shape().projected_slot_mask.as_slice()
362 }
363
364 #[must_use]
366 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
367 self.static_planning_shape().projection_is_model_identity
368 }
369
370 #[must_use]
372 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
373 self.static_planning_shape()
374 .order_referenced_slots
375 .as_deref()
376 }
377
378 #[must_use]
380 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
381 self.static_planning_shape().resolved_order.as_ref()
382 }
383
384 #[must_use]
386 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
387 self.static_planning_shape().slot_map.as_deref()
388 }
389
390 #[must_use]
392 pub(in crate::db) fn grouped_aggregate_execution_specs(
393 &self,
394 ) -> Option<&[GroupedAggregateExecutionSpec]> {
395 self.static_planning_shape()
396 .grouped_aggregate_execution_specs
397 .as_deref()
398 }
399
400 #[must_use]
402 pub(in crate::db) const fn grouped_distinct_execution_strategy(
403 &self,
404 ) -> Option<&GroupedDistinctExecutionStrategy> {
405 self.static_planning_shape()
406 .grouped_distinct_execution_strategy
407 .as_ref()
408 }
409
410 #[must_use]
412 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
413 &self.static_planning_shape().projection_spec
414 }
415
416 #[must_use]
418 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
419 self.static_planning_shape()
420 .projection_direct_slots
421 .as_deref()
422 }
423
424 #[must_use]
426 pub(in crate::db) fn frozen_data_row_direct_projection_slots(&self) -> Option<&[usize]> {
427 self.static_planning_shape()
428 .projection_data_row_direct_slots
429 .as_deref()
430 }
431
432 #[must_use]
434 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
435 self.static_planning_shape()
436 .index_compile_targets
437 .as_deref()
438 }
439
440 const fn static_planning_shape(&self) -> &StaticPlanningShape {
441 self.static_planning_shape
442 .as_ref()
443 .expect("access-planned queries must freeze static planning shape before execution")
444 }
445}
446
447fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
448 match access {
449 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
450 Some(DistinctExecutionStrategy::PreOrdered)
451 }
452 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
453 Some(DistinctExecutionStrategy::HashMaterialize)
454 }
455 AccessPlan::Path(_) => None,
456 }
457}
458
459fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
460 let is_grouped_safe = plan
461 .grouped_plan()
462 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
463
464 ContinuationPolicy::new(
465 true, true, is_grouped_safe,
468 )
469}
470
471#[must_use]
473pub(in crate::db) fn project_planner_route_profile_for_model(
474 model: &EntityModel,
475 plan: &AccessPlannedQuery,
476) -> PlannerRouteProfile {
477 let secondary_order_contract = plan
478 .scalar_plan()
479 .order
480 .as_ref()
481 .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
482
483 PlannerRouteProfile::new(
484 derive_continuation_policy_validated(plan),
485 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
486 secondary_order_contract,
487 )
488}
489
490fn project_static_planning_shape_for_model(
491 model: &EntityModel,
492 schema_info: &SchemaInfo,
493 plan: &AccessPlannedQuery,
494) -> Result<StaticPlanningShape, InternalError> {
495 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
496 let execution_preparation_predicate = plan.execution_preparation_predicate();
497 let residual_filter_predicate = derive_residual_filter_predicate(plan);
498 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
499 let execution_preparation_compiled_predicate =
500 compile_optional_predicate(schema_info, execution_preparation_predicate.as_ref());
501 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
502 schema_info,
503 residual_filter_expr.as_ref(),
504 residual_filter_predicate.as_ref(),
505 )?;
506 let scalar_projection_plan = if plan.grouped_plan().is_none() {
507 Some(
508 compile_scalar_projection_plan_with_schema(schema_info, &projection_spec)
509 .ok_or_else(|| {
510 InternalError::query_executor_invariant(
511 "scalar projection program must compile during static planning finalization",
512 )
513 })?
514 .iter()
515 .map(CompiledExpr::compile)
516 .collect(),
517 )
518 } else {
519 None
520 };
521 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
522 resolve_grouped_static_planning_semantics(schema_info, plan, &projection_spec)?;
523 let projection_direct_slots = lower_direct_projection_slots_with_schema(
524 model,
525 schema_info,
526 &plan.logical,
527 &plan.projection_selection,
528 );
529 let projection_data_row_direct_slots = lower_data_row_direct_projection_slots_with_schema(
530 model,
531 schema_info,
532 &plan.logical,
533 &plan.projection_selection,
534 );
535 let projection_referenced_slots =
536 projection_spec.referenced_slots_for_schema(model, schema_info)?;
537 let projected_slot_mask =
538 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
539 let projection_is_model_identity = projection_spec.is_model_identity_for(model);
540 let resolved_order = resolved_order_for_plan(schema_info, plan)?;
541 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
542 let slot_map = slot_map_for_schema_plan(schema_info, plan);
543 let index_compile_targets = index_compile_targets_for_schema_plan(schema_info, plan);
544
545 Ok(StaticPlanningShape {
546 primary_key_name: model.primary_key.name,
547 projection_spec,
548 execution_preparation_predicate,
549 residual_filter_expr,
550 residual_filter_predicate,
551 execution_preparation_compiled_predicate,
552 effective_runtime_filter_program,
553 scalar_projection_plan,
554 grouped_aggregate_execution_specs,
555 grouped_distinct_execution_strategy,
556 projection_direct_slots,
557 projection_data_row_direct_slots,
558 projection_referenced_slots,
559 projected_slot_mask,
560 projection_is_model_identity,
561 resolved_order,
562 order_referenced_slots,
563 slot_map,
564 index_compile_targets,
565 })
566}
567
568fn compile_effective_runtime_filter_program(
572 schema_info: &SchemaInfo,
573 residual_filter_expr: Option<&Expr>,
574 residual_filter_predicate: Option<&Predicate>,
575) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
576 if let Some(predicate) = residual_filter_predicate {
581 return Ok(Some(EffectiveRuntimeFilterProgram::predicate(
582 PredicateProgram::compile_with_schema_info(schema_info, predicate),
583 )));
584 }
585
586 if let Some(filter_expr) = residual_filter_expr {
587 let compiled = compile_scalar_projection_expr_with_schema(schema_info, filter_expr)
588 .ok_or_else(|| {
589 InternalError::query_invalid_logical_plan(
590 "effective runtime scalar filter expression must compile during static planning finalization",
591 )
592 })?;
593
594 return Ok(Some(EffectiveRuntimeFilterProgram::expression(
595 CompiledExpr::compile(&compiled),
596 )));
597 }
598
599 Ok(None)
600}
601
602fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
606 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
607
608 match plan.access.selected_index_contract() {
609 Some(index) => {
610 residual_query_predicate_after_filtered_access_contract(index, query_predicate)
611 }
612 None => Some(query_predicate.clone()),
613 }
614}
615
616fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
620 let filtered_residual = derive_execution_preparation_predicate(plan);
621 let filtered_residual = filtered_residual.as_ref()?;
622
623 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
624}
625
626fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
630 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
631 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
632 return None;
633 }
634
635 Some(filter_expr.clone())
636}
637
638fn derive_residual_filter_expr_for_model(
642 model: &EntityModel,
643 plan: &AccessPlannedQuery,
644) -> Option<Expr> {
645 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
646 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
647 return None;
648 }
649
650 Some(filter_expr.clone())
651}
652
653fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
657 match (
658 plan.scalar_plan().filter_expr.as_ref(),
659 plan.scalar_plan().predicate.as_ref(),
660 ) {
661 (None, None) => false,
662 (Some(_), None) => true,
663 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
664 }
665}
666
667fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
670 plan.scalar_plan().predicate.is_some()
671 && derive_residual_filter_predicate(plan).is_none()
672 && derive_residual_filter_expr(plan).is_none()
673}
674
675const fn derive_semantic_filter_fully_satisfied_by_access_contract(
679 plan: &AccessPlannedQuery,
680) -> bool {
681 plan.scalar_plan().filter_expr.is_some()
682 && plan.scalar_plan().predicate.is_some()
683 && plan.scalar_plan().predicate_covers_filter_expr
684}
685
686const fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
690 _model: &EntityModel,
691 plan: &AccessPlannedQuery,
692) -> bool {
693 derive_semantic_filter_fully_satisfied_by_access_contract(plan)
694}
695
696fn compile_optional_predicate(
699 schema_info: &SchemaInfo,
700 predicate: Option<&Predicate>,
701) -> Option<PredicateProgram> {
702 predicate.map(|predicate| PredicateProgram::compile_with_schema_info(schema_info, predicate))
703}
704
705fn resolve_grouped_static_planning_semantics(
709 schema_info: &SchemaInfo,
710 plan: &AccessPlannedQuery,
711 projection_spec: &ProjectionSpec,
712) -> Result<
713 (
714 Option<Vec<GroupedAggregateExecutionSpec>>,
715 Option<GroupedDistinctExecutionStrategy>,
716 ),
717 InternalError,
718> {
719 let Some(grouped) = plan.grouped_plan() else {
720 return Ok((None, None));
721 };
722
723 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
724 projection_spec,
725 grouped.group.group_fields.as_slice(),
726 grouped.group.aggregates.as_slice(),
727 )?;
728 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
729
730 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
731 schema_info,
732 aggregate_specs.as_slice(),
733 )?);
734 let grouped_distinct_execution_strategy = Some(
735 resolved_grouped_distinct_execution_strategy_with_schema_info(
736 schema_info,
737 grouped.group.group_fields.as_slice(),
738 grouped.group.aggregates.as_slice(),
739 grouped.having_expr.as_ref(),
740 )?,
741 );
742
743 Ok((
744 grouped_aggregate_execution_specs,
745 grouped_distinct_execution_strategy,
746 ))
747}
748
749fn extend_grouped_having_aggregate_specs(
750 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
751 grouped: &GroupPlan,
752) -> Result<(), InternalError> {
753 if let Some(having_expr) = grouped.having_expr.as_ref() {
754 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
755 }
756
757 Ok(())
758}
759
760fn collect_grouped_having_expr_aggregate_specs(
761 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
762 expr: &Expr,
763) -> Result<(), InternalError> {
764 if !expr.contains_aggregate() {
765 return Ok(());
766 }
767
768 expr.try_for_each_tree_aggregate(&mut |aggregate_expr| {
769 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
770
771 if aggregate_specs
772 .iter()
773 .all(|current| current != &aggregate_spec)
774 {
775 aggregate_specs.push(aggregate_spec);
776 }
777
778 Ok(())
779 })
780}
781
782fn projected_slot_mask_for_spec(
783 model: &EntityModel,
784 direct_projection_slots: Option<&[usize]>,
785) -> Vec<bool> {
786 let schema_slot_len = direct_projection_slots
787 .and_then(|slots| slots.iter().copied().max())
788 .map_or(0, |slot| slot.saturating_add(1));
789 let mut projected_slots = vec![false; model.fields().len().max(schema_slot_len)];
790
791 let Some(direct_projection_slots) = direct_projection_slots else {
792 return projected_slots;
793 };
794
795 for slot in direct_projection_slots.iter().copied() {
796 if let Some(projected) = projected_slots.get_mut(slot) {
797 *projected = true;
798 }
799 }
800
801 projected_slots
802}
803
804fn resolved_order_for_plan(
805 schema_info: &SchemaInfo,
806 plan: &AccessPlannedQuery,
807) -> Result<Option<ResolvedOrder>, InternalError> {
808 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
809 return Ok(None);
810 }
811
812 let Some(order) = plan.scalar_plan().order.as_ref() else {
813 return Ok(None);
814 };
815
816 let mut fields = Vec::with_capacity(order.fields.len());
817 for term in &order.fields {
818 fields.push(ResolvedOrderField::new(
819 resolved_order_value_source_for_term(schema_info, term)?,
820 term.direction(),
821 ));
822 }
823
824 Ok(Some(ResolvedOrder::new(fields)))
825}
826
827fn resolved_order_value_source_for_term(
828 schema_info: &SchemaInfo,
829 term: &crate::db::query::plan::OrderTerm,
830) -> Result<ResolvedOrderValueSource, InternalError> {
831 if term.direct_field().is_none() {
832 let rendered = term.rendered_label();
833 validate_resolved_order_expr_fields(schema_info, term.expr(), rendered.as_str())?;
834 let compiled = compile_scalar_projection_expr_with_schema(schema_info, term.expr())
835 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
836
837 return Ok(ResolvedOrderValueSource::expression(CompiledExpr::compile(
838 &compiled,
839 )));
840 }
841
842 let field = term
843 .direct_field()
844 .expect("direct-field order branch should only execute for field-backed terms");
845 let slot = resolve_required_schema_slot(schema_info, field, || {
846 InternalError::query_invalid_logical_plan(format!(
847 "order expression references unknown field '{field}'",
848 ))
849 })?;
850
851 Ok(ResolvedOrderValueSource::direct_field(slot))
852}
853
854fn validate_resolved_order_expr_fields(
855 schema_info: &SchemaInfo,
856 expr: &Expr,
857 rendered: &str,
858) -> Result<(), InternalError> {
859 expr.try_for_each_tree_expr(&mut |node| match node {
860 Expr::Field(field_id) => {
861 resolve_required_schema_slot(schema_info, field_id.as_str(), || {
862 InternalError::query_invalid_logical_plan(format!(
863 "order expression references unknown field '{rendered}'",
864 ))
865 })
866 .map(|_| ())
867 }
868 Expr::Aggregate(_) => Err(order_expression_scalar_seam_error(rendered)),
869 #[cfg(test)]
870 Expr::Alias { .. } => Err(order_expression_scalar_seam_error(rendered)),
871 Expr::Unary { .. } => Err(order_expression_scalar_seam_error(rendered)),
872 _ => Ok(()),
873 })
874}
875
876fn resolve_required_schema_slot<F>(
880 schema_info: &SchemaInfo,
881 field: &str,
882 invalid_plan_error: F,
883) -> Result<usize, InternalError>
884where
885 F: FnOnce() -> InternalError,
886{
887 schema_info
888 .field_slot_index(field)
889 .ok_or_else(invalid_plan_error)
890}
891
892fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
895 InternalError::query_invalid_logical_plan(format!(
896 "order expression '{rendered}' did not stay on the scalar expression seam",
897 ))
898}
899
900fn order_referenced_slots_for_resolved_order(
905 resolved_order: Option<&ResolvedOrder>,
906) -> Option<Vec<usize>> {
907 Some(resolved_order?.referenced_slots())
908}
909
910fn slot_map_for_schema_plan(
911 schema_info: &SchemaInfo,
912 plan: &AccessPlannedQuery,
913) -> Option<Vec<usize>> {
914 let executable = plan.access.executable_contract();
915
916 resolved_index_slots_for_access_path(schema_info, &executable)
917}
918
919fn resolved_index_slots_for_access_path(
920 schema_info: &SchemaInfo,
921 access: &ExecutableAccessPlan<'_, crate::value::Value>,
922) -> Option<Vec<usize>> {
923 let path = access.as_path()?;
924 let path_capabilities = path.capabilities();
925 let key_items = path_capabilities.index_key_items_for_slot_map()?;
926 let mut slots = Vec::new();
927
928 match key_items.key_items() {
929 SemanticIndexKeyItemsRef::Fields(fields) => {
930 slots.reserve(fields.len());
931 for field_name in fields {
932 let slot = schema_info.field_slot_index(field_name)?;
933 slots.push(slot);
934 }
935 }
936 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
937 slots.reserve(fields.len());
938 for &field_name in fields {
939 let slot = schema_info.field_slot_index(field_name)?;
940 slots.push(slot);
941 }
942 }
943 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
944 slots.reserve(items.len());
945 for key_item in items {
946 let slot = schema_info.field_slot_index(key_item.field())?;
947 slots.push(slot);
948 }
949 }
950 }
951
952 Some(slots)
953}
954
955fn index_compile_targets_for_schema_plan(
956 schema_info: &SchemaInfo,
957 plan: &AccessPlannedQuery,
958) -> Option<Vec<IndexCompileTarget>> {
959 let executable = plan.access.executable_contract();
960 let path = executable.as_path()?;
961 let key_items = path.capabilities().index_key_items_for_slot_map()?;
962 let mut targets = Vec::new();
963
964 match key_items.key_items() {
965 SemanticIndexKeyItemsRef::Fields(_fields) => return None,
966 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Fields(fields)) => {
967 for (component_index, &field_name) in fields.iter().enumerate() {
968 let field_slot = schema_info.field_slot_index(field_name)?;
969 targets.push(IndexCompileTarget {
970 component_index,
971 field_slot,
972 key_item: IndexKeyItem::Field(field_name),
973 });
974 }
975 }
976 SemanticIndexKeyItemsRef::Static(IndexKeyItemsRef::Items(items)) => {
977 for (component_index, &key_item) in items.iter().enumerate() {
978 let field_slot = schema_info.field_slot_index(key_item.field())?;
979 targets.push(IndexCompileTarget {
980 component_index,
981 field_slot,
982 key_item,
983 });
984 }
985 }
986 }
987
988 Some(targets)
989}