1#![deny(unreachable_patterns)]
6
7mod execute;
8mod fast_stream;
9mod index_range_limit;
10mod page;
11mod pk_stream;
12mod secondary_index;
13mod terminal;
14mod trace;
15
16pub(in crate::db::executor) use self::execute::{
17 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
18};
19
20use self::trace::{access_path_variant, execution_order_direction};
21use crate::{
22 db::{
23 Context, Db, GroupedRow,
24 access::AccessPlan,
25 contracts::canonical_value_compare,
26 cursor::{
27 ContinuationSignature, ContinuationToken, CursorBoundary, GroupedContinuationToken,
28 GroupedPlannedCursor, PlannedCursor, decode_pk_cursor_boundary,
29 },
30 data::DataKey,
31 direction::Direction,
32 executor::{
33 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
34 KeyOrderComparator, OrderedKeyStreamBox,
35 aggregate::field::{
36 AggregateFieldValueError, FieldSlot, extract_numeric_field_decimal,
37 extract_orderable_field_value, resolve_any_aggregate_target_slot,
38 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
39 },
40 aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
41 group::{
42 CanonicalKey, GroupKeySet, KeyCanonicalError, grouped_budget_observability,
43 grouped_execution_context_from_planner_config,
44 },
45 plan_metrics::{
46 GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
47 record_rows_scanned,
48 },
49 range_token_anchor_key, range_token_from_cursor_anchor,
50 route::aggregate_materialized_fold_direction,
51 validate_executor_plan,
52 },
53 index::IndexCompilePolicy,
54 predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
55 query::plan::{
56 AccessPlannedQuery, GroupAggregateSpec, GroupHavingSpec, GroupHavingSymbol,
57 LogicalPlan, OrderDirection, grouped_executor_handoff,
58 },
59 response::Response,
60 },
61 error::InternalError,
62 obs::sink::{ExecKind, Span},
63 traits::{EntityKind, EntityValue},
64 types::Decimal,
65 value::Value,
66};
67use std::{cmp::Ordering, marker::PhantomData};
68
69#[derive(Clone, Debug, Eq, PartialEq)]
75pub(in crate::db) enum PageCursor {
76 Scalar(ContinuationToken),
77 Grouped(GroupedContinuationToken),
78}
79
80impl PageCursor {
81 #[must_use]
83 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
84 match self {
85 Self::Scalar(token) => Some(token),
86 Self::Grouped(_) => None,
87 }
88 }
89
90 #[must_use]
92 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
93 match self {
94 Self::Scalar(_) => None,
95 Self::Grouped(token) => Some(token),
96 }
97 }
98}
99
100impl From<ContinuationToken> for PageCursor {
101 fn from(value: ContinuationToken) -> Self {
102 Self::Scalar(value)
103 }
104}
105
106impl From<GroupedContinuationToken> for PageCursor {
107 fn from(value: GroupedContinuationToken) -> Self {
108 Self::Grouped(value)
109 }
110}
111
112#[derive(Debug)]
120pub(crate) struct CursorPage<E: EntityKind> {
121 pub(crate) items: Response<E>,
122
123 pub(crate) next_cursor: Option<PageCursor>,
124}
125
126#[derive(Debug)]
132pub(in crate::db) struct GroupedCursorPage {
133 pub(in crate::db) rows: Vec<GroupedRow>,
134 pub(in crate::db) next_cursor: Option<PageCursor>,
135}
136
137#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub enum ExecutionAccessPathVariant {
145 ByKey,
146 ByKeys,
147 KeyRange,
148 IndexPrefix,
149 IndexRange,
150 FullScan,
151 Union,
152 Intersection,
153}
154
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
162pub enum ExecutionOptimization {
163 PrimaryKey,
164 SecondaryOrderPushdown,
165 IndexRangeLimitPushdown,
166}
167
168#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub struct ExecutionTrace {
177 pub access_path_variant: ExecutionAccessPathVariant,
178 pub direction: OrderDirection,
179 pub optimization: Option<ExecutionOptimization>,
180 pub keys_scanned: u64,
181 pub rows_returned: u64,
182 pub continuation_applied: bool,
183 pub index_predicate_applied: bool,
184 pub index_predicate_keys_rejected: u64,
185 pub distinct_keys_deduped: u64,
186}
187
188impl ExecutionTrace {
189 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
190 Self {
191 access_path_variant: access_path_variant(access),
192 direction: execution_order_direction(direction),
193 optimization: None,
194 keys_scanned: 0,
195 rows_returned: 0,
196 continuation_applied,
197 index_predicate_applied: false,
198 index_predicate_keys_rejected: 0,
199 distinct_keys_deduped: 0,
200 }
201 }
202
203 fn set_path_outcome(
204 &mut self,
205 optimization: Option<ExecutionOptimization>,
206 keys_scanned: usize,
207 rows_returned: usize,
208 index_predicate_applied: bool,
209 index_predicate_keys_rejected: u64,
210 distinct_keys_deduped: u64,
211 ) {
212 self.optimization = optimization;
213 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
214 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
215 self.index_predicate_applied = index_predicate_applied;
216 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
217 self.distinct_keys_deduped = distinct_keys_deduped;
218 }
219}
220
221pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
223 direction: Direction,
224) -> KeyOrderComparator {
225 KeyOrderComparator::from_direction(direction)
226}
227
228pub(in crate::db::executor) struct FastPathKeyResult {
236 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
237 pub(in crate::db::executor) rows_scanned: usize,
238 pub(in crate::db::executor) optimization: ExecutionOptimization,
239}
240
241#[derive(Clone)]
249pub(crate) struct LoadExecutor<E: EntityKind> {
250 db: Db<E::Canister>,
251 debug: bool,
252 _marker: PhantomData<E>,
253}
254
255struct GroupedRouteStage<E: EntityKind + EntityValue> {
264 plan: AccessPlannedQuery<E::Key>,
265 cursor: GroupedPlannedCursor,
266 direction: Direction,
267 continuation_signature: ContinuationSignature,
268 index_prefix_specs: Vec<crate::db::access::LoweredIndexPrefixSpec>,
269 index_range_specs: Vec<crate::db::access::LoweredIndexRangeSpec>,
270 grouped_execution: crate::db::query::plan::GroupedExecutionConfig,
271 group_fields: Vec<crate::db::query::plan::FieldSlot>,
272 grouped_aggregates: Vec<GroupAggregateSpec>,
273 grouped_having: Option<GroupHavingSpec>,
274 grouped_route_plan: crate::db::executor::ExecutionPlan,
275 grouped_plan_metrics_strategy: GroupedPlanMetricsStrategy,
276 global_distinct_field_aggregate: Option<(AggregateKind, String)>,
277 execution_trace: Option<ExecutionTrace>,
278}
279
280struct GroupedStreamStage<'a, E: EntityKind + EntityValue> {
289 ctx: Context<'a, E>,
290 execution_preparation: ExecutionPreparation,
291 resolved: ResolvedExecutionKeyStream,
292}
293
294struct GroupedFoldStage {
303 page: GroupedCursorPage,
304 filtered_rows: usize,
305 check_filtered_rows_upper_bound: bool,
306 rows_scanned: usize,
307 optimization: Option<ExecutionOptimization>,
308 index_predicate_applied: bool,
309 index_predicate_keys_rejected: u64,
310 distinct_keys_deduped: u64,
311}
312
313impl<E> LoadExecutor<E>
314where
315 E: EntityKind + EntityValue,
316{
317 #[must_use]
319 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
320 Self {
321 db,
322 debug,
323 _marker: PhantomData,
324 }
325 }
326
327 pub(in crate::db::executor) fn recovered_context(
329 &self,
330 ) -> Result<crate::db::Context<'_, E>, InternalError> {
331 self.db.recovered_context::<E>()
332 }
333
334 pub(in crate::db::executor) fn resolve_orderable_field_slot(
337 target_field: &str,
338 ) -> Result<FieldSlot, InternalError> {
339 resolve_orderable_aggregate_target_slot::<E>(target_field)
340 .map_err(AggregateFieldValueError::into_internal_error)
341 }
342
343 pub(in crate::db::executor) fn resolve_any_field_slot(
346 target_field: &str,
347 ) -> Result<FieldSlot, InternalError> {
348 resolve_any_aggregate_target_slot::<E>(target_field)
349 .map_err(AggregateFieldValueError::into_internal_error)
350 }
351
352 pub(in crate::db::executor) fn resolve_numeric_field_slot(
355 target_field: &str,
356 ) -> Result<FieldSlot, InternalError> {
357 resolve_numeric_aggregate_target_slot::<E>(target_field)
358 .map_err(AggregateFieldValueError::into_internal_error)
359 }
360
361 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
362 self.execute_paged_with_cursor(plan, PlannedCursor::none())
363 .map(|page| page.items)
364 }
365
366 pub(in crate::db) fn execute_paged_with_cursor(
367 &self,
368 plan: ExecutablePlan<E>,
369 cursor: impl Into<PlannedCursor>,
370 ) -> Result<CursorPage<E>, InternalError> {
371 self.execute_paged_with_cursor_traced(plan, cursor)
372 .map(|(page, _)| page)
373 }
374
375 pub(in crate::db) fn execute_paged_with_cursor_traced(
376 &self,
377 plan: ExecutablePlan<E>,
378 cursor: impl Into<PlannedCursor>,
379 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
380 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
381 return Err(InternalError::query_executor_invariant(
382 "grouped plans require execute_grouped pagination entrypoints",
383 ));
384 }
385
386 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
387 let cursor_boundary = cursor.boundary().cloned();
388 let index_range_token = cursor
389 .index_range_anchor()
390 .map(range_token_from_cursor_anchor);
391
392 if !plan.mode().is_load() {
393 return Err(InternalError::query_executor_invariant(
394 "load executor requires load plans",
395 ));
396 }
397
398 let continuation_signature = plan.continuation_signature();
399 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
400 let index_range_specs = plan.index_range_specs()?.to_vec();
401 let route_plan = Self::build_execution_route_plan_for_load(
402 plan.as_inner(),
403 cursor_boundary.as_ref(),
404 index_range_token.as_ref(),
405 None,
406 )?;
407 let continuation_applied = !matches!(
408 route_plan.continuation_mode(),
409 crate::db::executor::route::ContinuationMode::Initial
410 );
411 let direction = route_plan.direction();
412 debug_assert_eq!(
413 route_plan.window().effective_offset,
414 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
415 "route window effective offset must match logical plan offset semantics",
416 );
417 let mut execution_trace = self
418 .debug
419 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
420 let plan = plan.into_inner();
421 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
422
423 let result = (|| {
424 let mut span = Span::<E>::new(ExecKind::Load);
425
426 validate_executor_plan::<E>(&plan)?;
427 let ctx = self.db.recovered_context::<E>()?;
428 let execution_inputs = ExecutionInputs {
429 ctx: &ctx,
430 plan: &plan,
431 stream_bindings: AccessStreamBindings {
432 index_prefix_specs: index_prefix_specs.as_slice(),
433 index_range_specs: index_range_specs.as_slice(),
434 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
435 direction,
436 },
437 execution_preparation: &execution_preparation,
438 };
439
440 record_plan_metrics(&plan.access);
441 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
444 &execution_inputs,
445 &route_plan,
446 cursor_boundary.as_ref(),
447 continuation_signature,
448 IndexCompilePolicy::ConservativeSubset,
449 )?;
450 let page = materialized.page;
451 let rows_scanned = materialized.rows_scanned;
452 let post_access_rows = materialized.post_access_rows;
453 let optimization = materialized.optimization;
454 let index_predicate_applied = materialized.index_predicate_applied;
455 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
456 let distinct_keys_deduped = materialized.distinct_keys_deduped;
457
458 Ok(Self::finalize_execution(
459 page,
460 optimization,
461 rows_scanned,
462 post_access_rows,
463 index_predicate_applied,
464 index_predicate_keys_rejected,
465 distinct_keys_deduped,
466 &mut span,
467 &mut execution_trace,
468 ))
469 })();
470
471 result.map(|page| (page, execution_trace))
472 }
473
474 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
475 &self,
476 plan: ExecutablePlan<E>,
477 cursor: impl Into<GroupedPlannedCursor>,
478 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
479 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
480 return Err(InternalError::query_executor_invariant(
481 "grouped execution requires grouped logical plans",
482 ));
483 }
484
485 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
486
487 self.execute_grouped_path(plan, cursor)
488 }
489
490 fn execute_grouped_path(
496 &self,
497 plan: ExecutablePlan<E>,
498 cursor: GroupedPlannedCursor,
499 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
500 let route = Self::resolve_grouped_route(plan, cursor, self.debug)?;
501 let stream = self.build_grouped_stream(&route)?;
502 let folded = Self::execute_group_fold(&route, stream)?;
503
504 Ok(Self::finalize_grouped_output(route, folded))
505 }
506
507 fn resolve_grouped_route(
509 plan: ExecutablePlan<E>,
510 cursor: GroupedPlannedCursor,
511 debug: bool,
512 ) -> Result<GroupedRouteStage<E>, InternalError> {
513 validate_executor_plan::<E>(plan.as_inner())?;
514 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
515 let grouped_execution = grouped_handoff.execution();
516 let group_fields = grouped_handoff.group_fields().to_vec();
517 let grouped_aggregates = grouped_handoff.aggregates().to_vec();
518 let grouped_having = grouped_handoff.having().cloned();
519 let grouped_route_plan =
520 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
521 let grouped_route_observability =
522 grouped_route_plan.grouped_observability().ok_or_else(|| {
523 InternalError::query_executor_invariant(
524 "grouped route planning must emit grouped observability payload",
525 )
526 })?;
527 let grouped_route_outcome = grouped_route_observability.outcome();
528 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
529 let grouped_route_eligible = grouped_route_observability.eligible();
530 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
531 let grouped_plan_metrics_strategy =
532 match grouped_route_observability.grouped_execution_strategy() {
533 crate::db::executor::route::GroupedExecutionStrategy::HashMaterialized => {
534 GroupedPlanMetricsStrategy::HashMaterialized
535 }
536 crate::db::executor::route::GroupedExecutionStrategy::OrderedMaterialized => {
537 GroupedPlanMetricsStrategy::OrderedMaterialized
538 }
539 };
540 debug_assert!(
541 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
542 "grouped route eligibility and rejection reason must stay aligned",
543 );
544 debug_assert!(
545 grouped_route_outcome
546 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
547 || grouped_route_rejection_reason.is_some(),
548 "grouped rejected outcomes must carry a rejection reason",
549 );
550 debug_assert!(
551 matches!(
552 grouped_route_execution_mode,
553 crate::db::executor::route::ExecutionMode::Materialized
554 ),
555 "grouped execution route must remain blocking/materialized",
556 );
557
558 let direction = grouped_route_plan.direction();
559 let continuation_applied = !cursor.is_empty();
560 let execution_trace =
561 debug.then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
562 let continuation_signature = plan.continuation_signature();
563 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
564 let index_range_specs = plan.index_range_specs()?.to_vec();
565 let global_distinct_field_aggregate = Self::global_distinct_field_aggregate_spec(
566 group_fields.as_slice(),
567 grouped_aggregates.as_slice(),
568 grouped_having.as_ref(),
569 )?;
570 let plan = plan.into_inner();
571
572 Ok(GroupedRouteStage {
573 plan,
574 cursor,
575 direction,
576 continuation_signature,
577 index_prefix_specs,
578 index_range_specs,
579 grouped_execution,
580 group_fields,
581 grouped_aggregates,
582 grouped_having,
583 grouped_route_plan,
584 grouped_plan_metrics_strategy,
585 global_distinct_field_aggregate,
586 execution_trace,
587 })
588 }
589
590 fn build_grouped_stream<'a>(
592 &'a self,
593 route: &'a GroupedRouteStage<E>,
594 ) -> Result<GroupedStreamStage<'a, E>, InternalError> {
595 let execution_preparation = ExecutionPreparation::for_plan::<E>(&route.plan);
596 let ctx = self.db.recovered_context::<E>()?;
597 let execution_inputs = ExecutionInputs {
598 ctx: &ctx,
599 plan: &route.plan,
600 stream_bindings: AccessStreamBindings {
601 index_prefix_specs: route.index_prefix_specs.as_slice(),
602 index_range_specs: route.index_range_specs.as_slice(),
603 index_range_anchor: None,
604 direction: route.direction,
605 },
606 execution_preparation: &execution_preparation,
607 };
608 record_grouped_plan_metrics(&route.plan.access, route.grouped_plan_metrics_strategy);
609 let resolved = Self::resolve_execution_key_stream_without_distinct(
610 &execution_inputs,
611 &route.grouped_route_plan,
612 IndexCompilePolicy::ConservativeSubset,
613 )?;
614
615 Ok(GroupedStreamStage {
616 ctx,
617 execution_preparation,
618 resolved,
619 })
620 }
621
622 #[expect(clippy::too_many_lines)]
624 fn execute_group_fold(
625 route: &GroupedRouteStage<E>,
626 mut stream: GroupedStreamStage<'_, E>,
627 ) -> Result<GroupedFoldStage, InternalError> {
628 let mut grouped_execution_context =
629 grouped_execution_context_from_planner_config(Some(route.grouped_execution));
630 let max_groups_bound =
631 usize::try_from(grouped_execution_context.config().max_groups()).unwrap_or(usize::MAX);
632 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
633 debug_assert!(
634 grouped_budget.max_groups() >= grouped_budget.groups()
635 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
636 && grouped_execution_context
637 .config()
638 .max_distinct_values_total()
639 >= grouped_budget.distinct_values()
640 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
641 "grouped budget observability invariants must hold at grouped route entry",
642 );
643
644 let (mut grouped_engines, mut short_circuit_keys) =
645 if route.global_distinct_field_aggregate.is_none() {
646 let grouped_engines = route
647 .grouped_aggregates
648 .iter()
649 .map(|aggregate| {
650 if aggregate.target_field().is_some() {
651 return Err(InternalError::query_executor_invariant(format!(
652 "grouped field-target aggregate reached executor after planning: {:?}",
653 aggregate.kind()
654 )));
655 }
656
657 Ok(grouped_execution_context.create_grouped_engine::<E>(
658 aggregate.kind(),
659 aggregate_materialized_fold_direction(aggregate.kind()),
660 aggregate.distinct(),
661 ))
662 })
663 .collect::<Result<Vec<_>, _>>()?;
664 let short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
665
666 (grouped_engines, short_circuit_keys)
667 } else {
668 (Vec::new(), Vec::new())
669 };
670 let mut scanned_rows = 0usize;
671 let mut filtered_rows = 0usize;
672 let compiled_predicate = stream.execution_preparation.compiled_predicate();
673
674 if let Some((aggregate_kind, target_field)) = route.global_distinct_field_aggregate.as_ref()
675 {
676 if !route.cursor.is_empty() {
677 return Err(InternalError::from_cursor_plan_error(
678 crate::db::cursor::CursorPlanError::invalid_continuation_cursor_payload(
679 "global DISTINCT grouped aggregates do not support continuation cursors",
680 ),
681 ));
682 }
683
684 let global_row = Self::execute_global_distinct_field_aggregate(
685 &route.plan,
686 &stream.ctx,
687 &mut stream.resolved,
688 compiled_predicate,
689 &mut grouped_execution_context,
690 (*aggregate_kind, target_field.as_str()),
691 (&mut scanned_rows, &mut filtered_rows),
692 )?;
693 let page_rows = Self::page_global_distinct_grouped_row(
694 global_row,
695 route.plan.scalar_plan().page.as_ref(),
696 );
697 let rows_scanned = stream
698 .resolved
699 .rows_scanned_override
700 .unwrap_or(scanned_rows);
701 let optimization = stream.resolved.optimization;
702 let index_predicate_applied = stream.resolved.index_predicate_applied;
703 let index_predicate_keys_rejected = stream.resolved.index_predicate_keys_rejected;
704 let distinct_keys_deduped = stream
705 .resolved
706 .distinct_keys_deduped_counter
707 .as_ref()
708 .map_or(0, |counter| counter.get());
709
710 return Ok(GroupedFoldStage {
711 page: GroupedCursorPage {
712 rows: page_rows,
713 next_cursor: None,
714 },
715 filtered_rows,
716 check_filtered_rows_upper_bound: false,
717 rows_scanned,
718 optimization,
719 index_predicate_applied,
720 index_predicate_keys_rejected,
721 distinct_keys_deduped,
722 });
723 }
724
725 while let Some(key) = stream.resolved.key_stream.next_key()? {
727 let row = match route.plan.scalar_plan().consistency {
728 MissingRowPolicy::Error => stream.ctx.read_strict(&key),
729 MissingRowPolicy::Ignore => stream.ctx.read(&key),
730 };
731 let row = match row {
732 Ok(row) => row,
733 Err(err) if err.is_not_found() => continue,
734 Err(err) => return Err(err),
735 };
736 scanned_rows = scanned_rows.saturating_add(1);
737 let (id, entity) = Context::<E>::deserialize_row((key, row))?;
738 if let Some(compiled_predicate) = compiled_predicate
739 && !compiled_predicate.eval(&entity)
740 {
741 continue;
742 }
743 filtered_rows = filtered_rows.saturating_add(1);
744
745 let group_values = route
746 .group_fields
747 .iter()
748 .map(|field| {
749 entity.get_value_by_index(field.index()).ok_or_else(|| {
750 InternalError::query_executor_invariant(format!(
751 "grouped field slot missing on entity: index={}",
752 field.index()
753 ))
754 })
755 })
756 .collect::<Result<Vec<_>, _>>()?;
757 let group_key = Value::List(group_values)
758 .canonical_key()
759 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
760 let canonical_group_value = group_key.canonical_value().clone();
761 let data_key = DataKey::try_new::<E>(id.key())?;
762
763 for (index, engine) in grouped_engines.iter_mut().enumerate() {
764 if short_circuit_keys[index].iter().any(|done| {
765 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
766 }) {
767 continue;
768 }
769
770 let fold_control = engine
771 .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
772 .map_err(Self::map_group_error)?;
773 if matches!(fold_control, FoldControl::Break) {
774 short_circuit_keys[index].push(canonical_group_value.clone());
775 debug_assert!(
776 short_circuit_keys[index].len() <= max_groups_bound,
777 "grouped short-circuit key tracking must stay bounded by max_groups",
778 );
779 }
780 }
781 }
782
783 let aggregate_count = grouped_engines.len();
788 if aggregate_count == 0 {
789 return Err(InternalError::query_executor_invariant(
790 "grouped execution requires at least one aggregate terminal",
791 ));
792 }
793 let mut finalized_iters = grouped_engines
794 .into_iter()
795 .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
796 .collect::<Result<Vec<_>, _>>()?;
797 let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
798 InternalError::query_executor_invariant("missing grouped primary iterator")
799 })?;
800
801 let initial_offset = route
803 .plan
804 .scalar_plan()
805 .page
806 .as_ref()
807 .map_or(0, |page| page.offset);
808 let resume_initial_offset = if route.cursor.is_empty() {
809 initial_offset
810 } else {
811 route.cursor.initial_offset()
812 };
813 let resume_boundary = route
814 .cursor
815 .last_group_key()
816 .map(|last_group_key| Value::List(last_group_key.to_vec()));
817 let apply_initial_offset = route.cursor.is_empty();
818 let limit = route
819 .plan
820 .scalar_plan()
821 .page
822 .as_ref()
823 .and_then(|page| page.limit)
824 .and_then(|limit| usize::try_from(limit).ok());
825 let initial_offset_for_page = if apply_initial_offset {
826 usize::try_from(initial_offset).unwrap_or(usize::MAX)
827 } else {
828 0
829 };
830 let selection_bound = limit.and_then(|limit| {
831 limit
832 .checked_add(initial_offset_for_page)
833 .and_then(|count| count.checked_add(1))
834 });
835 let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
836 if limit.is_none_or(|limit| limit != 0) {
837 for primary_output in primary_iter.by_ref() {
838 let group_key_value = primary_output.group_key().canonical_value().clone();
839 let mut aggregate_values = Vec::with_capacity(aggregate_count);
840 aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
841 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
842 let sibling_output = sibling_iter.next().ok_or_else(|| {
843 InternalError::query_executor_invariant(format!(
844 "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
845 ))
846 })?;
847 let sibling_group_key = sibling_output.group_key().canonical_value();
848 if canonical_value_compare(sibling_group_key, &group_key_value)
849 != Ordering::Equal
850 {
851 return Err(InternalError::query_executor_invariant(format!(
852 "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
853 )));
854 }
855 aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
856 }
857 debug_assert_eq!(
858 aggregate_values.len(),
859 aggregate_count,
860 "grouped aggregate value alignment must preserve declared aggregate count",
861 );
862 if let Some(grouped_having) = route.grouped_having.as_ref()
863 && !Self::group_matches_having(
864 grouped_having,
865 route.group_fields.as_slice(),
866 &group_key_value,
867 aggregate_values.as_slice(),
868 )?
869 {
870 continue;
871 }
872
873 if let Some(resume_boundary) = resume_boundary.as_ref()
874 && canonical_value_compare(&group_key_value, resume_boundary)
875 != Ordering::Greater
876 {
877 continue;
878 }
879
880 if let Some(selection_bound) = selection_bound {
883 match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
884 canonical_value_compare(existing_key, &group_key_value)
885 }) {
886 Ok(_) => {
887 return Err(InternalError::query_executor_invariant(format!(
888 "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
889 )));
890 }
891 Err(insert_index) => {
892 grouped_candidate_rows
893 .insert(insert_index, (group_key_value, aggregate_values));
894 if grouped_candidate_rows.len() > selection_bound {
895 let _ = grouped_candidate_rows.pop();
896 }
897 debug_assert!(
898 grouped_candidate_rows.len() <= selection_bound,
899 "bounded grouped candidate rows must stay <= selection_bound",
900 );
901 }
902 }
903 } else {
904 grouped_candidate_rows.push((group_key_value, aggregate_values));
905 debug_assert!(
906 grouped_candidate_rows.len() <= max_groups_bound,
907 "grouped candidate rows must stay bounded by max_groups",
908 );
909 }
910 }
911 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
912 if sibling_iter.next().is_some() {
913 return Err(InternalError::query_executor_invariant(format!(
914 "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
915 )));
916 }
917 }
918 if selection_bound.is_none() {
919 grouped_candidate_rows
920 .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
921 }
922 }
923 if let Some(selection_bound) = selection_bound {
924 debug_assert!(
925 grouped_candidate_rows.len() <= selection_bound,
926 "grouped candidate rows must remain bounded by selection_bound",
927 );
928 } else {
929 debug_assert!(
930 grouped_candidate_rows.len() <= max_groups_bound,
931 "grouped candidate rows must remain bounded by max_groups",
932 );
933 }
934
935 let mut page_rows = Vec::<GroupedRow>::new();
936 let mut last_emitted_group_key: Option<Vec<Value>> = None;
937 let mut has_more = false;
938 let mut groups_skipped_for_offset = 0usize;
939 for (group_key_value, aggregate_values) in grouped_candidate_rows {
940 if groups_skipped_for_offset < initial_offset_for_page {
941 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
942 continue;
943 }
944 if let Some(limit) = limit
945 && page_rows.len() >= limit
946 {
947 has_more = true;
948 break;
949 }
950
951 let emitted_group_key = match group_key_value {
952 Value::List(values) => values,
953 value => {
954 return Err(InternalError::query_executor_invariant(format!(
955 "grouped canonical key must be Value::List, found {value:?}"
956 )));
957 }
958 };
959 last_emitted_group_key = Some(emitted_group_key.clone());
960 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
961 debug_assert!(
962 limit.is_none_or(|bounded_limit| page_rows.len() <= bounded_limit),
963 "grouped page rows must not exceed explicit page limit",
964 );
965 }
966
967 let next_cursor = if has_more {
968 last_emitted_group_key.map(|last_group_key| {
969 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
970 route.continuation_signature,
971 last_group_key,
972 Direction::Asc,
973 resume_initial_offset,
974 ))
975 })
976 } else {
977 None
978 };
979 let rows_scanned = stream
980 .resolved
981 .rows_scanned_override
982 .unwrap_or(scanned_rows);
983 let optimization = stream.resolved.optimization;
984 let index_predicate_applied = stream.resolved.index_predicate_applied;
985 let index_predicate_keys_rejected = stream.resolved.index_predicate_keys_rejected;
986 let distinct_keys_deduped = stream
987 .resolved
988 .distinct_keys_deduped_counter
989 .as_ref()
990 .map_or(0, |counter| counter.get());
991
992 Ok(GroupedFoldStage {
993 page: GroupedCursorPage {
994 rows: page_rows,
995 next_cursor,
996 },
997 filtered_rows,
998 check_filtered_rows_upper_bound: true,
999 rows_scanned,
1000 optimization,
1001 index_predicate_applied,
1002 index_predicate_keys_rejected,
1003 distinct_keys_deduped,
1004 })
1005 }
1006
1007 fn finalize_grouped_output(
1009 mut route: GroupedRouteStage<E>,
1010 folded: GroupedFoldStage,
1011 ) -> (GroupedCursorPage, Option<ExecutionTrace>) {
1012 let rows_returned = folded.page.rows.len();
1013 Self::finalize_path_outcome(
1014 &mut route.execution_trace,
1015 folded.optimization,
1016 folded.rows_scanned,
1017 rows_returned,
1018 folded.index_predicate_applied,
1019 folded.index_predicate_keys_rejected,
1020 folded.distinct_keys_deduped,
1021 );
1022
1023 let mut span = Span::<E>::new(ExecKind::Load);
1024 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
1025 if folded.check_filtered_rows_upper_bound {
1026 debug_assert!(
1027 folded.filtered_rows >= rows_returned,
1028 "grouped pagination must return at most filtered row cardinality",
1029 );
1030 }
1031
1032 (folded.page, route.execution_trace)
1033 }
1034
1035 fn map_group_error(err: GroupError) -> InternalError {
1037 match err {
1038 GroupError::MemoryLimitExceeded { .. } | GroupError::DistinctBudgetExceeded { .. } => {
1039 InternalError::executor_internal(err.to_string())
1040 }
1041 GroupError::Internal(inner) => inner,
1042 }
1043 }
1044
1045 fn global_distinct_field_aggregate_spec(
1048 group_fields: &[crate::db::query::plan::FieldSlot],
1049 aggregates: &[GroupAggregateSpec],
1050 having: Option<&GroupHavingSpec>,
1051 ) -> Result<Option<(AggregateKind, String)>, InternalError> {
1052 if !group_fields.is_empty() {
1053 return Ok(None);
1054 }
1055 if aggregates.is_empty() {
1056 return Ok(None);
1057 }
1058 if aggregates
1059 .iter()
1060 .all(|aggregate| aggregate.target_field().is_none())
1061 {
1062 return Ok(None);
1063 }
1064 if having.is_some() {
1065 return Err(InternalError::query_executor_invariant(
1066 "global DISTINCT grouped aggregate shape does not support HAVING",
1067 ));
1068 }
1069 if aggregates.len() != 1 {
1070 return Err(InternalError::query_executor_invariant(
1071 "global DISTINCT grouped aggregate shape requires exactly one aggregate terminal",
1072 ));
1073 }
1074
1075 let aggregate = &aggregates[0];
1076 let Some(target_field) = aggregate.target_field() else {
1077 return Err(InternalError::query_executor_invariant(
1078 "global DISTINCT grouped aggregate shape requires field-target aggregate",
1079 ));
1080 };
1081 if !aggregate.distinct() {
1082 return Err(InternalError::query_executor_invariant(
1083 "global DISTINCT grouped aggregate shape requires DISTINCT aggregate terminal",
1084 ));
1085 }
1086 if !matches!(aggregate.kind(), AggregateKind::Count | AggregateKind::Sum) {
1087 return Err(InternalError::query_executor_invariant(format!(
1088 "unsupported global DISTINCT grouped aggregate kind: {:?}",
1089 aggregate.kind()
1090 )));
1091 }
1092
1093 Ok(Some((aggregate.kind(), target_field.to_string())))
1094 }
1095
1096 fn execute_global_distinct_field_aggregate(
1099 plan: &AccessPlannedQuery<E::Key>,
1100 ctx: &Context<'_, E>,
1101 resolved: &mut ResolvedExecutionKeyStream,
1102 compiled_predicate: Option<&crate::db::predicate::PredicateProgram>,
1103 grouped_execution_context: &mut crate::db::executor::aggregate::ExecutionContext,
1104 aggregate_spec: (AggregateKind, &str),
1105 row_counters: (&mut usize, &mut usize),
1106 ) -> Result<GroupedRow, InternalError> {
1107 let (aggregate_kind, target_field) = aggregate_spec;
1108 let (scanned_rows, filtered_rows) = row_counters;
1109 let field_slot = if aggregate_kind.is_sum() {
1110 Self::resolve_numeric_field_slot(target_field)?
1111 } else {
1112 Self::resolve_any_field_slot(target_field)?
1113 };
1114 let mut distinct_values = GroupKeySet::new();
1115 let mut count = 0u32;
1116 let mut sum = Decimal::ZERO;
1117 let mut saw_sum_value = false;
1118
1119 grouped_execution_context
1120 .record_implicit_single_group::<E>()
1121 .map_err(Self::map_group_error)?;
1122
1123 while let Some(key) = resolved.key_stream.next_key()? {
1124 let row = match plan.scalar_plan().consistency {
1125 MissingRowPolicy::Error => ctx.read_strict(&key),
1126 MissingRowPolicy::Ignore => ctx.read(&key),
1127 };
1128 let row = match row {
1129 Ok(row) => row,
1130 Err(err) if err.is_not_found() => continue,
1131 Err(err) => return Err(err),
1132 };
1133 *scanned_rows = scanned_rows.saturating_add(1);
1134 let (_, entity) = Context::<E>::deserialize_row((key, row))?;
1135 if let Some(compiled_predicate) = compiled_predicate
1136 && !compiled_predicate.eval(&entity)
1137 {
1138 continue;
1139 }
1140 *filtered_rows = filtered_rows.saturating_add(1);
1141
1142 let distinct_value = extract_orderable_field_value(&entity, target_field, field_slot)
1143 .map_err(AggregateFieldValueError::into_internal_error)?;
1144 let distinct_key = distinct_value
1145 .canonical_key()
1146 .map_err(KeyCanonicalError::into_internal_error)?;
1147 let distinct_admitted = grouped_execution_context
1148 .admit_distinct_key(
1149 &mut distinct_values,
1150 grouped_execution_context
1151 .config()
1152 .max_distinct_values_per_group(),
1153 distinct_key,
1154 )
1155 .map_err(Self::map_group_error)?;
1156 if !distinct_admitted {
1157 continue;
1158 }
1159
1160 if aggregate_kind.is_sum() {
1161 let numeric_value =
1162 extract_numeric_field_decimal(&entity, target_field, field_slot)
1163 .map_err(AggregateFieldValueError::into_internal_error)?;
1164 sum += numeric_value;
1165 saw_sum_value = true;
1166 } else {
1167 count = count.saturating_add(1);
1168 }
1169 }
1170
1171 let aggregate_value = if aggregate_kind.is_sum() {
1172 if saw_sum_value {
1173 Value::Decimal(sum)
1174 } else {
1175 Value::Null
1176 }
1177 } else {
1178 Value::Uint(u64::from(count))
1179 };
1180
1181 Ok(GroupedRow::new(Vec::new(), vec![aggregate_value]))
1182 }
1183
1184 fn page_global_distinct_grouped_row(
1186 row: GroupedRow,
1187 page: Option<&crate::db::query::plan::PageSpec>,
1188 ) -> Vec<GroupedRow> {
1189 let Some(page) = page else {
1190 return vec![row];
1191 };
1192 if page.offset > 0 || page.limit == Some(0) {
1193 return Vec::new();
1194 }
1195
1196 vec![row]
1197 }
1198
1199 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
1201 match output {
1202 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
1203 AggregateOutput::Sum(value) => value.map_or(Value::Null, Value::Decimal),
1204 AggregateOutput::Exists(value) => Value::Bool(*value),
1205 AggregateOutput::Min(value)
1206 | AggregateOutput::Max(value)
1207 | AggregateOutput::First(value)
1208 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
1209 }
1210 }
1211
1212 fn group_matches_having(
1214 having: &GroupHavingSpec,
1215 group_fields: &[crate::db::query::plan::FieldSlot],
1216 group_key_value: &Value,
1217 aggregate_values: &[Value],
1218 ) -> Result<bool, InternalError> {
1219 for (index, clause) in having.clauses().iter().enumerate() {
1220 let actual = match clause.symbol() {
1221 GroupHavingSymbol::GroupField(field_slot) => {
1222 let group_key_list = match group_key_value {
1223 Value::List(values) => values,
1224 value => {
1225 return Err(InternalError::query_executor_invariant(format!(
1226 "grouped HAVING requires list-shaped grouped keys, found {value:?}"
1227 )));
1228 }
1229 };
1230 let Some(group_field_offset) = group_fields
1231 .iter()
1232 .position(|group_field| group_field.index() == field_slot.index())
1233 else {
1234 return Err(InternalError::query_executor_invariant(format!(
1235 "grouped HAVING field is not in grouped key projection: field='{}'",
1236 field_slot.field()
1237 )));
1238 };
1239 group_key_list.get(group_field_offset).ok_or_else(|| {
1240 InternalError::query_executor_invariant(format!(
1241 "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
1242 group_key_list.len()
1243 ))
1244 })?
1245 }
1246 GroupHavingSymbol::AggregateIndex(aggregate_index) => {
1247 aggregate_values.get(*aggregate_index).ok_or_else(|| {
1248 InternalError::query_executor_invariant(format!(
1249 "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
1250 aggregate_values.len()
1251 ))
1252 })?
1253 }
1254 };
1255
1256 if !Self::having_compare_values(actual, clause.op(), clause.value())? {
1257 return Ok(false);
1258 }
1259 }
1260
1261 Ok(true)
1262 }
1263
1264 fn having_compare_values(
1266 actual: &Value,
1267 op: CompareOp,
1268 expected: &Value,
1269 ) -> Result<bool, InternalError> {
1270 let strict = CoercionSpec::default();
1271 let matches = match op {
1272 CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
1273 CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
1274 CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
1275 CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
1276 CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
1277 CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
1278 CompareOp::In
1279 | CompareOp::NotIn
1280 | CompareOp::Contains
1281 | CompareOp::StartsWith
1282 | CompareOp::EndsWith => {
1283 return Err(InternalError::query_executor_invariant(format!(
1284 "unsupported grouped HAVING operator reached executor: {op:?}"
1285 )));
1286 }
1287 };
1288
1289 Ok(matches)
1290 }
1291
1292 fn finalize_path_outcome(
1294 execution_trace: &mut Option<ExecutionTrace>,
1295 optimization: Option<ExecutionOptimization>,
1296 rows_scanned: usize,
1297 rows_returned: usize,
1298 index_predicate_applied: bool,
1299 index_predicate_keys_rejected: u64,
1300 distinct_keys_deduped: u64,
1301 ) {
1302 record_rows_scanned::<E>(rows_scanned);
1303 if let Some(execution_trace) = execution_trace.as_mut() {
1304 execution_trace.set_path_outcome(
1305 optimization,
1306 rows_scanned,
1307 rows_returned,
1308 index_predicate_applied,
1309 index_predicate_keys_rejected,
1310 distinct_keys_deduped,
1311 );
1312 debug_assert_eq!(
1313 execution_trace.keys_scanned,
1314 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
1315 "execution trace keys_scanned must match rows_scanned metrics input",
1316 );
1317 }
1318 }
1319
1320 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
1322 plan: &AccessPlannedQuery<E::Key>,
1323 cursor_boundary: Option<&CursorBoundary>,
1324 ) -> Result<(), InternalError> {
1325 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
1326 return Ok(());
1327 }
1328 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
1329
1330 Ok(())
1331 }
1332}