1use crate::{
7 db::{
8 access::{AccessPlan, ExecutableAccessPlan},
9 predicate::{IndexCompileTarget, Predicate, PredicateProgram, normalize_enum_literals},
10 query::plan::{
11 AccessPlannedQuery, ContinuationPolicy, DistinctExecutionStrategy,
12 EffectiveRuntimeFilterProgram, ExecutionShapeSignature, GroupPlan,
13 GroupedAggregateExecutionSpec, GroupedDistinctExecutionStrategy, GroupedPlanStrategy,
14 LogicalPlan, PlannerRouteProfile, QueryMode, ResolvedOrder, ResolvedOrderField,
15 ResolvedOrderValueSource, ScalarPlan, StaticPlanningShape,
16 derive_logical_pushdown_eligibility,
17 expr::{
18 Expr, ProjectionField, ProjectionSpec, ScalarProjectionExpr,
19 canonicalize_runtime_predicate_via_bool_expr, compile_scalar_projection_expr,
20 compile_scalar_projection_plan, derive_normalized_bool_expr_predicate_subset,
21 normalize_bool_expr, projection_field_expr,
22 },
23 grouped_aggregate_execution_specs, grouped_aggregate_specs_from_projection_spec,
24 grouped_cursor_policy_violation, grouped_plan_strategy, lower_direct_projection_slots,
25 lower_projection_identity, lower_projection_intent,
26 residual_query_predicate_after_access_path_bounds,
27 residual_query_predicate_after_filtered_access,
28 resolved_grouped_distinct_execution_strategy_for_model,
29 },
30 schema::SchemaInfo,
31 },
32 error::InternalError,
33 model::{
34 entity::{EntityModel, resolve_field_slot},
35 index::IndexKeyItemsRef,
36 },
37};
38
39impl QueryMode {
40 #[must_use]
42 pub const fn is_load(&self) -> bool {
43 match self {
44 Self::Load(_) => true,
45 Self::Delete(_) => false,
46 }
47 }
48
49 #[must_use]
51 pub const fn is_delete(&self) -> bool {
52 match self {
53 Self::Delete(_) => true,
54 Self::Load(_) => false,
55 }
56 }
57}
58
59impl LogicalPlan {
60 #[must_use]
62 pub(in crate::db) const fn scalar_semantics(&self) -> &ScalarPlan {
63 match self {
64 Self::Scalar(plan) => plan,
65 Self::Grouped(plan) => &plan.scalar,
66 }
67 }
68
69 #[must_use]
71 #[cfg(test)]
72 pub(in crate::db) const fn scalar_semantics_mut(&mut self) -> &mut ScalarPlan {
73 match self {
74 Self::Scalar(plan) => plan,
75 Self::Grouped(plan) => &mut plan.scalar,
76 }
77 }
78
79 #[must_use]
81 #[cfg(test)]
82 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
83 self.scalar_semantics()
84 }
85
86 #[must_use]
88 #[cfg(test)]
89 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
90 self.scalar_semantics_mut()
91 }
92}
93
94impl AccessPlannedQuery {
95 #[must_use]
97 pub(in crate::db) const fn scalar_plan(&self) -> &ScalarPlan {
98 self.logical.scalar_semantics()
99 }
100
101 #[must_use]
103 #[cfg(test)]
104 pub(in crate::db) const fn scalar_plan_mut(&mut self) -> &mut ScalarPlan {
105 self.logical.scalar_semantics_mut()
106 }
107
108 #[must_use]
110 #[cfg(test)]
111 pub(in crate::db) const fn scalar(&self) -> &ScalarPlan {
112 self.scalar_plan()
113 }
114
115 #[must_use]
117 #[cfg(test)]
118 pub(in crate::db) const fn scalar_mut(&mut self) -> &mut ScalarPlan {
119 self.scalar_plan_mut()
120 }
121
122 #[must_use]
124 pub(in crate::db) const fn grouped_plan(&self) -> Option<&GroupPlan> {
125 match &self.logical {
126 LogicalPlan::Scalar(_) => None,
127 LogicalPlan::Grouped(plan) => Some(plan),
128 }
129 }
130
131 #[must_use]
133 pub(in crate::db) fn projection_spec(&self, model: &EntityModel) -> ProjectionSpec {
134 if let Some(static_shape) = &self.static_planning_shape {
135 return static_shape.projection_spec.clone();
136 }
137
138 lower_projection_intent(model, &self.logical, &self.projection_selection)
139 }
140
141 #[must_use]
143 pub(in crate::db::query) fn projection_spec_for_identity(&self) -> ProjectionSpec {
144 lower_projection_identity(&self.logical, &self.projection_selection)
145 }
146
147 #[must_use]
153 pub(in crate::db) fn execution_preparation_predicate(&self) -> Option<Predicate> {
154 if let Some(static_shape) = self.static_planning_shape.as_ref() {
155 return static_shape.execution_preparation_predicate.clone();
156 }
157
158 derive_execution_preparation_predicate(self)
159 }
160
161 #[must_use]
165 pub(in crate::db) fn effective_execution_predicate(&self) -> Option<Predicate> {
166 if let Some(static_shape) = self.static_planning_shape.as_ref() {
167 return static_shape.residual_filter_predicate.clone();
168 }
169
170 derive_residual_filter_predicate(self)
171 }
172
173 #[must_use]
176 pub(in crate::db) fn has_residual_filter_predicate(&self) -> bool {
177 self.effective_execution_predicate().is_some()
178 }
179
180 #[must_use]
183 pub(in crate::db) fn residual_filter_expr(&self) -> Option<&Expr> {
184 if let Some(static_shape) = self.static_planning_shape.as_ref() {
185 return static_shape.residual_filter_expr.as_ref();
186 }
187
188 if !derive_has_residual_filter(self) {
189 return None;
190 }
191
192 self.scalar_plan().filter_expr.as_ref()
193 }
194
195 #[must_use]
198 pub(in crate::db) fn has_residual_filter_expr(&self) -> bool {
199 self.residual_filter_expr().is_some()
200 }
201
202 #[must_use]
204 pub(in crate::db) const fn execution_preparation_compiled_predicate(
205 &self,
206 ) -> Option<&PredicateProgram> {
207 self.static_planning_shape()
208 .execution_preparation_compiled_predicate
209 .as_ref()
210 }
211
212 #[must_use]
214 pub(in crate::db) const fn effective_runtime_compiled_predicate(
215 &self,
216 ) -> Option<&PredicateProgram> {
217 match self
218 .static_planning_shape()
219 .effective_runtime_filter_program
220 .as_ref()
221 {
222 Some(EffectiveRuntimeFilterProgram::Predicate(program)) => Some(program),
223 Some(EffectiveRuntimeFilterProgram::Expr(_)) | None => None,
224 }
225 }
226
227 #[must_use]
229 pub(in crate::db) const fn effective_runtime_compiled_filter_expr(
230 &self,
231 ) -> Option<&ScalarProjectionExpr> {
232 match self
233 .static_planning_shape()
234 .effective_runtime_filter_program
235 .as_ref()
236 {
237 Some(EffectiveRuntimeFilterProgram::Expr(expr)) => Some(expr),
238 Some(EffectiveRuntimeFilterProgram::Predicate(_)) | None => None,
239 }
240 }
241
242 #[must_use]
244 pub(in crate::db) const fn effective_runtime_filter_program(
245 &self,
246 ) -> Option<&EffectiveRuntimeFilterProgram> {
247 self.static_planning_shape()
248 .effective_runtime_filter_program
249 .as_ref()
250 }
251
252 #[must_use]
254 pub(in crate::db) fn distinct_execution_strategy(&self) -> DistinctExecutionStrategy {
255 if !self.scalar_plan().distinct {
256 return DistinctExecutionStrategy::None;
257 }
258
259 match distinct_runtime_dedup_strategy(&self.access) {
263 Some(strategy) => strategy,
264 None => DistinctExecutionStrategy::None,
265 }
266 }
267
268 pub(in crate::db) fn finalize_planner_route_profile_for_model(&mut self, model: &EntityModel) {
270 self.set_planner_route_profile(project_planner_route_profile_for_model(model, self));
271 }
272
273 pub(in crate::db) fn finalize_static_planning_shape_for_model(
275 &mut self,
276 model: &EntityModel,
277 ) -> Result<(), InternalError> {
278 self.static_planning_shape = Some(project_static_planning_shape_for_model(model, self)?);
279
280 Ok(())
281 }
282
283 #[must_use]
285 pub(in crate::db) fn execution_shape_signature(
286 &self,
287 entity_path: &'static str,
288 ) -> ExecutionShapeSignature {
289 ExecutionShapeSignature::new(self.continuation_signature(entity_path))
290 }
291
292 #[must_use]
295 pub(in crate::db) fn predicate_fully_satisfied_by_access_contract(&self) -> bool {
296 if let Some(static_shape) = self.static_planning_shape.as_ref() {
297 return self.scalar_plan().predicate.is_some()
298 && static_shape.residual_filter_predicate.is_none()
299 && static_shape.residual_filter_expr.is_none();
300 }
301
302 derive_predicate_fully_satisfied_by_access_contract(self)
303 }
304
305 #[must_use]
307 pub(in crate::db) fn scalar_projection_plan(&self) -> Option<&[ScalarProjectionExpr]> {
308 self.static_planning_shape()
309 .scalar_projection_plan
310 .as_deref()
311 }
312
313 #[must_use]
315 pub(in crate::db) const fn primary_key_name(&self) -> &'static str {
316 self.static_planning_shape().primary_key_name
317 }
318
319 #[must_use]
321 pub(in crate::db) const fn projection_referenced_slots(&self) -> &[usize] {
322 self.static_planning_shape()
323 .projection_referenced_slots
324 .as_slice()
325 }
326
327 #[must_use]
329 #[cfg(any(test, feature = "diagnostics"))]
330 pub(in crate::db) const fn projected_slot_mask(&self) -> &[bool] {
331 self.static_planning_shape().projected_slot_mask.as_slice()
332 }
333
334 #[must_use]
336 pub(in crate::db) const fn projection_is_model_identity(&self) -> bool {
337 self.static_planning_shape().projection_is_model_identity
338 }
339
340 #[must_use]
342 pub(in crate::db) fn order_referenced_slots(&self) -> Option<&[usize]> {
343 self.static_planning_shape()
344 .order_referenced_slots
345 .as_deref()
346 }
347
348 #[must_use]
350 pub(in crate::db) const fn resolved_order(&self) -> Option<&ResolvedOrder> {
351 self.static_planning_shape().resolved_order.as_ref()
352 }
353
354 #[must_use]
356 pub(in crate::db) fn slot_map(&self) -> Option<&[usize]> {
357 self.static_planning_shape().slot_map.as_deref()
358 }
359
360 #[must_use]
362 pub(in crate::db) fn grouped_aggregate_execution_specs(
363 &self,
364 ) -> Option<&[GroupedAggregateExecutionSpec]> {
365 self.static_planning_shape()
366 .grouped_aggregate_execution_specs
367 .as_deref()
368 }
369
370 #[must_use]
372 pub(in crate::db) const fn grouped_distinct_execution_strategy(
373 &self,
374 ) -> Option<&GroupedDistinctExecutionStrategy> {
375 self.static_planning_shape()
376 .grouped_distinct_execution_strategy
377 .as_ref()
378 }
379
380 #[must_use]
382 pub(in crate::db) const fn frozen_projection_spec(&self) -> &ProjectionSpec {
383 &self.static_planning_shape().projection_spec
384 }
385
386 #[must_use]
388 pub(in crate::db) fn frozen_direct_projection_slots(&self) -> Option<&[usize]> {
389 self.static_planning_shape()
390 .projection_direct_slots
391 .as_deref()
392 }
393
394 #[must_use]
396 pub(in crate::db) fn index_compile_targets(&self) -> Option<&[IndexCompileTarget]> {
397 self.static_planning_shape()
398 .index_compile_targets
399 .as_deref()
400 }
401
402 const fn static_planning_shape(&self) -> &StaticPlanningShape {
403 self.static_planning_shape
404 .as_ref()
405 .expect("access-planned queries must freeze static planning shape before execution")
406 }
407}
408
409fn distinct_runtime_dedup_strategy<K>(access: &AccessPlan<K>) -> Option<DistinctExecutionStrategy> {
410 match access {
411 AccessPlan::Union(_) | AccessPlan::Intersection(_) => {
412 Some(DistinctExecutionStrategy::PreOrdered)
413 }
414 AccessPlan::Path(path) if path.as_ref().is_index_multi_lookup() => {
415 Some(DistinctExecutionStrategy::HashMaterialize)
416 }
417 AccessPlan::Path(_) => None,
418 }
419}
420
421fn derive_continuation_policy_validated(plan: &AccessPlannedQuery) -> ContinuationPolicy {
422 let is_grouped_safe = plan
423 .grouped_plan()
424 .is_none_or(|grouped| grouped_cursor_policy_violation(grouped, true).is_none());
425
426 ContinuationPolicy::new(
427 true, true, is_grouped_safe,
430 )
431}
432
433#[must_use]
435pub(in crate::db) fn project_planner_route_profile_for_model(
436 model: &EntityModel,
437 plan: &AccessPlannedQuery,
438) -> PlannerRouteProfile {
439 let secondary_order_contract = plan
440 .scalar_plan()
441 .order
442 .as_ref()
443 .and_then(|order| order.deterministic_secondary_order_contract(model.primary_key.name));
444
445 PlannerRouteProfile::new(
446 derive_continuation_policy_validated(plan),
447 derive_logical_pushdown_eligibility(plan, secondary_order_contract.as_ref()),
448 secondary_order_contract,
449 )
450}
451
452fn project_static_planning_shape_for_model(
453 model: &EntityModel,
454 plan: &AccessPlannedQuery,
455) -> Result<StaticPlanningShape, InternalError> {
456 let projection_spec = lower_projection_intent(model, &plan.logical, &plan.projection_selection);
457 let execution_preparation_predicate = plan.execution_preparation_predicate();
458 let residual_filter_predicate = derive_residual_filter_predicate(plan);
459 let residual_filter_expr = derive_residual_filter_expr_for_model(model, plan);
460 let execution_preparation_compiled_predicate =
461 compile_optional_predicate(model, execution_preparation_predicate.as_ref());
462 let effective_runtime_filter_program = compile_effective_runtime_filter_program(
463 model,
464 residual_filter_expr.as_ref(),
465 residual_filter_predicate.as_ref(),
466 )?;
467 let scalar_projection_plan =
468 if plan.grouped_plan().is_none() {
469 Some(compile_scalar_projection_plan(model, &projection_spec).ok_or_else(|| {
470 InternalError::query_executor_invariant(
471 "scalar projection program must compile during static planning finalization",
472 )
473 })?)
474 } else {
475 None
476 };
477 let (grouped_aggregate_execution_specs, grouped_distinct_execution_strategy) =
478 resolve_grouped_static_planning_semantics(model, plan, &projection_spec)?;
479 let projection_direct_slots =
480 lower_direct_projection_slots(model, &plan.logical, &plan.projection_selection);
481 let projection_referenced_slots =
482 projection_referenced_slots_for_spec(model, &projection_spec)?;
483 let projected_slot_mask =
484 projected_slot_mask_for_spec(model, projection_direct_slots.as_deref());
485 let projection_is_model_identity =
486 projection_is_model_identity_for_spec(model, &projection_spec);
487 let resolved_order = resolved_order_for_plan(model, plan)?;
488 let order_referenced_slots = order_referenced_slots_for_resolved_order(resolved_order.as_ref());
489 let slot_map = slot_map_for_model_plan(model, plan);
490 let index_compile_targets = index_compile_targets_for_model_plan(model, plan);
491
492 Ok(StaticPlanningShape {
493 primary_key_name: model.primary_key.name,
494 projection_spec,
495 execution_preparation_predicate,
496 residual_filter_expr,
497 residual_filter_predicate,
498 execution_preparation_compiled_predicate,
499 effective_runtime_filter_program,
500 scalar_projection_plan,
501 grouped_aggregate_execution_specs,
502 grouped_distinct_execution_strategy,
503 projection_direct_slots,
504 projection_referenced_slots,
505 projected_slot_mask,
506 projection_is_model_identity,
507 resolved_order,
508 order_referenced_slots,
509 slot_map,
510 index_compile_targets,
511 })
512}
513
514fn compile_effective_runtime_filter_program(
518 model: &EntityModel,
519 residual_filter_expr: Option<&Expr>,
520 residual_filter_predicate: Option<&Predicate>,
521) -> Result<Option<EffectiveRuntimeFilterProgram>, InternalError> {
522 if let Some(predicate) = residual_filter_predicate {
527 return Ok(Some(EffectiveRuntimeFilterProgram::Predicate(
528 PredicateProgram::compile(model, predicate),
529 )));
530 }
531
532 if let Some(filter_expr) = residual_filter_expr {
533 let compiled = compile_scalar_projection_expr(model, filter_expr).ok_or_else(|| {
534 InternalError::query_invalid_logical_plan(
535 "effective runtime scalar filter expression must compile during static planning finalization",
536 )
537 })?;
538
539 return Ok(Some(EffectiveRuntimeFilterProgram::Expr(compiled)));
540 }
541
542 Ok(None)
543}
544
545fn derive_execution_preparation_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
549 let query_predicate = plan.scalar_plan().predicate.as_ref()?;
550
551 match plan.access.selected_index_model() {
552 Some(index) => residual_query_predicate_after_filtered_access(index, query_predicate),
553 None => Some(query_predicate.clone()),
554 }
555}
556
557fn derive_residual_filter_predicate(plan: &AccessPlannedQuery) -> Option<Predicate> {
561 let filtered_residual = derive_execution_preparation_predicate(plan);
562 let filtered_residual = filtered_residual.as_ref()?;
563
564 residual_query_predicate_after_access_path_bounds(plan.access.as_path(), filtered_residual)
565}
566
567fn derive_residual_filter_expr(plan: &AccessPlannedQuery) -> Option<Expr> {
571 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
572 if derive_semantic_filter_fully_satisfied_by_access_contract(plan) {
573 return None;
574 }
575
576 Some(filter_expr.clone())
577}
578
579fn derive_residual_filter_expr_for_model(
583 model: &EntityModel,
584 plan: &AccessPlannedQuery,
585) -> Option<Expr> {
586 let filter_expr = plan.scalar_plan().filter_expr.as_ref()?;
587 if derive_semantic_filter_fully_satisfied_by_access_contract_for_model(model, plan) {
588 return None;
589 }
590
591 Some(filter_expr.clone())
592}
593
594fn derive_has_residual_filter(plan: &AccessPlannedQuery) -> bool {
598 match (
599 plan.scalar_plan().filter_expr.as_ref(),
600 plan.scalar_plan().predicate.as_ref(),
601 ) {
602 (None, None) => false,
603 (Some(_), None) => true,
604 (Some(_) | None, Some(_)) => !plan.predicate_fully_satisfied_by_access_contract(),
605 }
606}
607
608fn derive_predicate_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
611 plan.scalar_plan().predicate.is_some()
612 && derive_residual_filter_predicate(plan).is_none()
613 && derive_residual_filter_expr(plan).is_none()
614}
615
616fn derive_semantic_filter_fully_satisfied_by_access_contract(plan: &AccessPlannedQuery) -> bool {
620 let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
621 return false;
622 };
623 let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
624 let Some(filter_predicate) =
625 derive_normalized_bool_expr_predicate_subset(&normalized_filter_expr)
626 else {
627 return false;
628 };
629 let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
630 return false;
631 };
632
633 canonicalize_runtime_predicate_via_bool_expr(filter_predicate)
634 == canonicalize_runtime_predicate_via_bool_expr(query_predicate.clone())
635}
636
637fn derive_semantic_filter_fully_satisfied_by_access_contract_for_model(
641 model: &EntityModel,
642 plan: &AccessPlannedQuery,
643) -> bool {
644 let Some(filter_expr) = plan.scalar_plan().filter_expr.as_ref() else {
645 return false;
646 };
647 let normalized_filter_expr = normalize_bool_expr(filter_expr.clone());
648 let Some(filter_predicate) =
649 derive_normalized_bool_expr_predicate_subset(&normalized_filter_expr)
650 else {
651 return false;
652 };
653 let Some(query_predicate) = plan.scalar_plan().predicate.as_ref() else {
654 return false;
655 };
656 let schema = SchemaInfo::cached_for_entity_model(model);
657 let Ok(filter_predicate) = normalize_enum_literals(schema, &filter_predicate) else {
658 return false;
659 };
660 let Ok(query_predicate) = normalize_enum_literals(schema, query_predicate) else {
661 return false;
662 };
663
664 canonicalize_runtime_predicate_via_bool_expr(filter_predicate)
665 == canonicalize_runtime_predicate_via_bool_expr(query_predicate)
666}
667
668fn compile_optional_predicate(
671 model: &EntityModel,
672 predicate: Option<&Predicate>,
673) -> Option<PredicateProgram> {
674 predicate.map(|predicate| PredicateProgram::compile(model, predicate))
675}
676
677fn resolve_grouped_static_planning_semantics(
681 model: &EntityModel,
682 plan: &AccessPlannedQuery,
683 projection_spec: &ProjectionSpec,
684) -> Result<
685 (
686 Option<Vec<GroupedAggregateExecutionSpec>>,
687 Option<GroupedDistinctExecutionStrategy>,
688 ),
689 InternalError,
690> {
691 let Some(grouped) = plan.grouped_plan() else {
692 return Ok((None, None));
693 };
694
695 let mut aggregate_specs = grouped_aggregate_specs_from_projection_spec(
696 projection_spec,
697 grouped.group.group_fields.as_slice(),
698 grouped.group.aggregates.as_slice(),
699 )?;
700 extend_grouped_having_aggregate_specs(&mut aggregate_specs, grouped)?;
701
702 let grouped_aggregate_execution_specs = Some(grouped_aggregate_execution_specs(
703 model,
704 aggregate_specs.as_slice(),
705 )?);
706 let grouped_distinct_execution_strategy =
707 Some(resolved_grouped_distinct_execution_strategy_for_model(
708 model,
709 grouped.group.group_fields.as_slice(),
710 grouped.group.aggregates.as_slice(),
711 grouped.having_expr.as_ref(),
712 )?);
713
714 Ok((
715 grouped_aggregate_execution_specs,
716 grouped_distinct_execution_strategy,
717 ))
718}
719
720fn extend_grouped_having_aggregate_specs(
721 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
722 grouped: &GroupPlan,
723) -> Result<(), InternalError> {
724 if let Some(having_expr) = grouped.having_expr.as_ref() {
725 collect_grouped_having_expr_aggregate_specs(aggregate_specs, having_expr)?;
726 }
727
728 Ok(())
729}
730
731fn collect_grouped_having_expr_aggregate_specs(
732 aggregate_specs: &mut Vec<GroupedAggregateExecutionSpec>,
733 expr: &Expr,
734) -> Result<(), InternalError> {
735 match expr {
736 Expr::Aggregate(aggregate_expr) => {
737 let aggregate_spec = GroupedAggregateExecutionSpec::from_aggregate_expr(aggregate_expr);
738
739 if aggregate_specs
740 .iter()
741 .all(|current| current != &aggregate_spec)
742 {
743 aggregate_specs.push(aggregate_spec);
744 }
745 }
746 Expr::Field(_) | Expr::Literal(_) => {}
747 Expr::FunctionCall { args, .. } => {
748 for arg in args {
749 collect_grouped_having_expr_aggregate_specs(aggregate_specs, arg)?;
750 }
751 }
752 Expr::Unary { expr, .. } => {
753 collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
754 }
755 Expr::Case {
756 when_then_arms,
757 else_expr,
758 } => {
759 for arm in when_then_arms {
760 collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.condition())?;
761 collect_grouped_having_expr_aggregate_specs(aggregate_specs, arm.result())?;
762 }
763
764 collect_grouped_having_expr_aggregate_specs(aggregate_specs, else_expr)?;
765 }
766 Expr::Binary { left, right, .. } => {
767 collect_grouped_having_expr_aggregate_specs(aggregate_specs, left)?;
768 collect_grouped_having_expr_aggregate_specs(aggregate_specs, right)?;
769 }
770 #[cfg(test)]
771 Expr::Alias { expr, .. } => {
772 collect_grouped_having_expr_aggregate_specs(aggregate_specs, expr)?;
773 }
774 }
775
776 Ok(())
777}
778
779fn projection_referenced_slots_for_spec(
780 model: &EntityModel,
781 projection: &ProjectionSpec,
782) -> Result<Vec<usize>, InternalError> {
783 let mut referenced = vec![false; model.fields().len()];
784
785 for field in projection.fields() {
786 mark_projection_expr_slots(
787 model,
788 projection_field_expr(field),
789 referenced.as_mut_slice(),
790 )?;
791 }
792
793 Ok(referenced
794 .into_iter()
795 .enumerate()
796 .filter_map(|(slot, required)| required.then_some(slot))
797 .collect())
798}
799
800fn mark_projection_expr_slots(
801 model: &EntityModel,
802 expr: &Expr,
803 referenced: &mut [bool],
804) -> Result<(), InternalError> {
805 match expr {
806 Expr::Field(field_id) => {
807 let field_name = field_id.as_str();
808 let slot = resolve_required_field_slot(model, field_name, || {
809 InternalError::query_invalid_logical_plan(format!(
810 "projection expression references unknown field '{field_name}'",
811 ))
812 })?;
813 referenced[slot] = true;
814 }
815 Expr::Literal(_) => {}
816 Expr::FunctionCall { args, .. } => {
817 for arg in args {
818 mark_projection_expr_slots(model, arg, referenced)?;
819 }
820 }
821 Expr::Case {
822 when_then_arms,
823 else_expr,
824 } => {
825 for arm in when_then_arms {
826 mark_projection_expr_slots(model, arm.condition(), referenced)?;
827 mark_projection_expr_slots(model, arm.result(), referenced)?;
828 }
829 mark_projection_expr_slots(model, else_expr.as_ref(), referenced)?;
830 }
831 Expr::Aggregate(_) => {}
832 #[cfg(test)]
833 Expr::Alias { expr, .. } => {
834 mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
835 }
836 Expr::Unary { expr, .. } => {
837 mark_projection_expr_slots(model, expr.as_ref(), referenced)?;
838 }
839 Expr::Binary { left, right, .. } => {
840 mark_projection_expr_slots(model, left.as_ref(), referenced)?;
841 mark_projection_expr_slots(model, right.as_ref(), referenced)?;
842 }
843 }
844
845 Ok(())
846}
847
848fn projected_slot_mask_for_spec(
849 model: &EntityModel,
850 direct_projection_slots: Option<&[usize]>,
851) -> Vec<bool> {
852 let mut projected_slots = vec![false; model.fields().len()];
853
854 let Some(direct_projection_slots) = direct_projection_slots else {
855 return projected_slots;
856 };
857
858 for slot in direct_projection_slots.iter().copied() {
859 if let Some(projected) = projected_slots.get_mut(slot) {
860 *projected = true;
861 }
862 }
863
864 projected_slots
865}
866
867fn projection_is_model_identity_for_spec(model: &EntityModel, projection: &ProjectionSpec) -> bool {
868 if projection.len() != model.fields().len() {
869 return false;
870 }
871
872 for (field_model, projected_field) in model.fields().iter().zip(projection.fields()) {
873 match projected_field {
874 ProjectionField::Scalar {
875 expr: Expr::Field(field_id),
876 alias: None,
877 } if field_id.as_str() == field_model.name() => {}
878 ProjectionField::Scalar { .. } => return false,
879 }
880 }
881
882 true
883}
884
885fn resolved_order_for_plan(
886 model: &EntityModel,
887 plan: &AccessPlannedQuery,
888) -> Result<Option<ResolvedOrder>, InternalError> {
889 if grouped_plan_strategy(plan).is_some_and(GroupedPlanStrategy::is_top_k_group) {
890 return Ok(None);
891 }
892
893 let Some(order) = plan.scalar_plan().order.as_ref() else {
894 return Ok(None);
895 };
896
897 let mut fields = Vec::with_capacity(order.fields.len());
898 for term in &order.fields {
899 fields.push(ResolvedOrderField::new(
900 resolved_order_value_source_for_term(model, term)?,
901 term.direction(),
902 ));
903 }
904
905 Ok(Some(ResolvedOrder::new(fields)))
906}
907
908fn resolved_order_value_source_for_term(
909 model: &EntityModel,
910 term: &crate::db::query::plan::OrderTerm,
911) -> Result<ResolvedOrderValueSource, InternalError> {
912 if term.direct_field().is_none() {
913 let rendered = term.rendered_label();
914 validate_resolved_order_expr_fields(model, term.expr(), rendered.as_str())?;
915 let compiled = compile_scalar_projection_expr(model, term.expr())
916 .ok_or_else(|| order_expression_scalar_seam_error(rendered.as_str()))?;
917
918 return Ok(ResolvedOrderValueSource::expression(compiled));
919 }
920
921 let field = term
922 .direct_field()
923 .expect("direct-field order branch should only execute for field-backed terms");
924 let slot = resolve_required_field_slot(model, field, || {
925 InternalError::query_invalid_logical_plan(format!(
926 "order expression references unknown field '{field}'",
927 ))
928 })?;
929
930 Ok(ResolvedOrderValueSource::direct_field(slot))
931}
932
933fn validate_resolved_order_expr_fields(
934 model: &EntityModel,
935 expr: &Expr,
936 rendered: &str,
937) -> Result<(), InternalError> {
938 match expr {
939 Expr::Field(field_id) => {
940 resolve_required_field_slot(model, field_id.as_str(), || {
941 InternalError::query_invalid_logical_plan(format!(
942 "order expression references unknown field '{rendered}'",
943 ))
944 })?;
945 }
946 Expr::Literal(_) => {}
947 Expr::FunctionCall { args, .. } => {
948 for arg in args {
949 validate_resolved_order_expr_fields(model, arg, rendered)?;
950 }
951 }
952 Expr::Case {
953 when_then_arms,
954 else_expr,
955 } => {
956 for arm in when_then_arms {
957 validate_resolved_order_expr_fields(model, arm.condition(), rendered)?;
958 validate_resolved_order_expr_fields(model, arm.result(), rendered)?;
959 }
960 validate_resolved_order_expr_fields(model, else_expr.as_ref(), rendered)?;
961 }
962 Expr::Binary { left, right, .. } => {
963 validate_resolved_order_expr_fields(model, left.as_ref(), rendered)?;
964 validate_resolved_order_expr_fields(model, right.as_ref(), rendered)?;
965 }
966 Expr::Aggregate(_) => {
967 return Err(order_expression_scalar_seam_error(rendered));
968 }
969 #[cfg(test)]
970 Expr::Alias { .. } => {
971 return Err(order_expression_scalar_seam_error(rendered));
972 }
973 Expr::Unary { .. } => {
974 return Err(order_expression_scalar_seam_error(rendered));
975 }
976 }
977
978 Ok(())
979}
980
981fn resolve_required_field_slot<F>(
984 model: &EntityModel,
985 field: &str,
986 invalid_plan_error: F,
987) -> Result<usize, InternalError>
988where
989 F: FnOnce() -> InternalError,
990{
991 resolve_field_slot(model, field).ok_or_else(invalid_plan_error)
992}
993
994fn order_expression_scalar_seam_error(rendered: &str) -> InternalError {
997 InternalError::query_invalid_logical_plan(format!(
998 "order expression '{rendered}' did not stay on the scalar expression seam",
999 ))
1000}
1001
1002fn order_referenced_slots_for_resolved_order(
1007 resolved_order: Option<&ResolvedOrder>,
1008) -> Option<Vec<usize>> {
1009 let resolved_order = resolved_order?;
1010 let mut referenced = Vec::new();
1011
1012 for field in resolved_order.fields() {
1015 field.source().extend_referenced_slots(&mut referenced);
1016 }
1017
1018 Some(referenced)
1019}
1020
1021fn slot_map_for_model_plan(model: &EntityModel, plan: &AccessPlannedQuery) -> Option<Vec<usize>> {
1022 let access_strategy = plan.access.resolve_strategy();
1023 let executable = access_strategy.executable();
1024
1025 resolved_index_slots_for_access_path(model, executable)
1026}
1027
1028fn resolved_index_slots_for_access_path(
1029 model: &EntityModel,
1030 access: &ExecutableAccessPlan<'_, crate::value::Value>,
1031) -> Option<Vec<usize>> {
1032 let path = access.as_path()?;
1033 let path_capabilities = path.capabilities();
1034 let index_fields = path_capabilities.index_fields_for_slot_map()?;
1035 let mut slots = Vec::with_capacity(index_fields.len());
1036
1037 for field_name in index_fields {
1038 let slot = resolve_field_slot(model, field_name)?;
1039 slots.push(slot);
1040 }
1041
1042 Some(slots)
1043}
1044
1045fn index_compile_targets_for_model_plan(
1046 model: &EntityModel,
1047 plan: &AccessPlannedQuery,
1048) -> Option<Vec<IndexCompileTarget>> {
1049 let index = plan.access.as_path()?.selected_index_model()?;
1050 let mut targets = Vec::new();
1051
1052 match index.key_items() {
1053 IndexKeyItemsRef::Fields(fields) => {
1054 for (component_index, &field_name) in fields.iter().enumerate() {
1055 let field_slot = resolve_field_slot(model, field_name)?;
1056 targets.push(IndexCompileTarget {
1057 component_index,
1058 field_slot,
1059 key_item: crate::model::index::IndexKeyItem::Field(field_name),
1060 });
1061 }
1062 }
1063 IndexKeyItemsRef::Items(items) => {
1064 for (component_index, &key_item) in items.iter().enumerate() {
1065 let field_slot = resolve_field_slot(model, key_item.field())?;
1066 targets.push(IndexCompileTarget {
1067 component_index,
1068 field_slot,
1069 key_item,
1070 });
1071 }
1072 }
1073 }
1074
1075 Some(targets)
1076}