1#![deny(unreachable_patterns)]
6
7mod execute;
8mod fast_stream;
9mod index_range_limit;
10mod page;
11mod pk_stream;
12mod secondary_index;
13mod terminal;
14mod trace;
15
16pub(in crate::db::executor) use self::execute::{
17 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
18};
19
20use self::trace::{access_path_variant, execution_order_direction};
21use crate::{
22 db::{
23 Context, Db, GroupedRow,
24 access::AccessPlan,
25 contracts::canonical_value_compare,
26 cursor::{
27 ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
28 PlannedCursor, decode_pk_cursor_boundary,
29 },
30 data::DataKey,
31 direction::Direction,
32 executor::{
33 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
34 KeyOrderComparator, OrderedKeyStreamBox,
35 aggregate::field::{
36 AggregateFieldValueError, FieldSlot, extract_numeric_field_decimal,
37 extract_orderable_field_value, resolve_any_aggregate_target_slot,
38 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
39 },
40 aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
41 group::{
42 CanonicalKey, GroupKeySet, KeyCanonicalError, grouped_budget_observability,
43 grouped_execution_context_from_planner_config,
44 },
45 plan_metrics::{
46 GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
47 record_rows_scanned,
48 },
49 range_token_anchor_key, range_token_from_cursor_anchor,
50 route::aggregate_materialized_fold_direction,
51 validate_executor_plan,
52 },
53 index::IndexCompilePolicy,
54 predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
55 query::plan::{
56 AccessPlannedQuery, GroupAggregateSpec, GroupHavingSpec, GroupHavingSymbol,
57 LogicalPlan, OrderDirection, grouped_executor_handoff,
58 },
59 response::Response,
60 },
61 error::InternalError,
62 obs::sink::{ExecKind, Span},
63 traits::{EntityKind, EntityValue},
64 types::Decimal,
65 value::Value,
66};
67use std::{cmp::Ordering, marker::PhantomData};
68
69#[derive(Clone, Debug, Eq, PartialEq)]
75pub(in crate::db) enum PageCursor {
76 Scalar(ContinuationToken),
77 Grouped(GroupedContinuationToken),
78}
79
80impl PageCursor {
81 #[must_use]
83 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
84 match self {
85 Self::Scalar(token) => Some(token),
86 Self::Grouped(_) => None,
87 }
88 }
89
90 #[must_use]
92 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
93 match self {
94 Self::Scalar(_) => None,
95 Self::Grouped(token) => Some(token),
96 }
97 }
98}
99
100impl From<ContinuationToken> for PageCursor {
101 fn from(value: ContinuationToken) -> Self {
102 Self::Scalar(value)
103 }
104}
105
106impl From<GroupedContinuationToken> for PageCursor {
107 fn from(value: GroupedContinuationToken) -> Self {
108 Self::Grouped(value)
109 }
110}
111
112#[derive(Debug)]
120pub(crate) struct CursorPage<E: EntityKind> {
121 pub(crate) items: Response<E>,
122
123 pub(crate) next_cursor: Option<PageCursor>,
124}
125
126#[derive(Debug)]
132pub(in crate::db) struct GroupedCursorPage {
133 pub(in crate::db) rows: Vec<GroupedRow>,
134 pub(in crate::db) next_cursor: Option<PageCursor>,
135}
136
137#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub enum ExecutionAccessPathVariant {
145 ByKey,
146 ByKeys,
147 KeyRange,
148 IndexPrefix,
149 IndexRange,
150 FullScan,
151 Union,
152 Intersection,
153}
154
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
162pub enum ExecutionOptimization {
163 PrimaryKey,
164 SecondaryOrderPushdown,
165 IndexRangeLimitPushdown,
166}
167
168#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub struct ExecutionTrace {
177 pub access_path_variant: ExecutionAccessPathVariant,
178 pub direction: OrderDirection,
179 pub optimization: Option<ExecutionOptimization>,
180 pub keys_scanned: u64,
181 pub rows_returned: u64,
182 pub continuation_applied: bool,
183 pub index_predicate_applied: bool,
184 pub index_predicate_keys_rejected: u64,
185 pub distinct_keys_deduped: u64,
186}
187
188impl ExecutionTrace {
189 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
190 Self {
191 access_path_variant: access_path_variant(access),
192 direction: execution_order_direction(direction),
193 optimization: None,
194 keys_scanned: 0,
195 rows_returned: 0,
196 continuation_applied,
197 index_predicate_applied: false,
198 index_predicate_keys_rejected: 0,
199 distinct_keys_deduped: 0,
200 }
201 }
202
203 fn set_path_outcome(
204 &mut self,
205 optimization: Option<ExecutionOptimization>,
206 keys_scanned: usize,
207 rows_returned: usize,
208 index_predicate_applied: bool,
209 index_predicate_keys_rejected: u64,
210 distinct_keys_deduped: u64,
211 ) {
212 self.optimization = optimization;
213 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
214 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
215 self.index_predicate_applied = index_predicate_applied;
216 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
217 self.distinct_keys_deduped = distinct_keys_deduped;
218 }
219}
220
221pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
223 direction: Direction,
224) -> KeyOrderComparator {
225 KeyOrderComparator::from_direction(direction)
226}
227
228pub(in crate::db::executor) struct FastPathKeyResult {
236 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
237 pub(in crate::db::executor) rows_scanned: usize,
238 pub(in crate::db::executor) optimization: ExecutionOptimization,
239}
240
241#[derive(Clone)]
249pub(crate) struct LoadExecutor<E: EntityKind> {
250 db: Db<E::Canister>,
251 debug: bool,
252 _marker: PhantomData<E>,
253}
254
255impl<E> LoadExecutor<E>
256where
257 E: EntityKind + EntityValue,
258{
259 #[must_use]
261 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
262 Self {
263 db,
264 debug,
265 _marker: PhantomData,
266 }
267 }
268
269 pub(in crate::db::executor) fn recovered_context(
271 &self,
272 ) -> Result<crate::db::Context<'_, E>, InternalError> {
273 self.db.recovered_context::<E>()
274 }
275
276 pub(in crate::db::executor) fn resolve_orderable_field_slot(
279 target_field: &str,
280 ) -> Result<FieldSlot, InternalError> {
281 resolve_orderable_aggregate_target_slot::<E>(target_field)
282 .map_err(AggregateFieldValueError::into_internal_error)
283 }
284
285 pub(in crate::db::executor) fn resolve_any_field_slot(
288 target_field: &str,
289 ) -> Result<FieldSlot, InternalError> {
290 resolve_any_aggregate_target_slot::<E>(target_field)
291 .map_err(AggregateFieldValueError::into_internal_error)
292 }
293
294 pub(in crate::db::executor) fn resolve_numeric_field_slot(
297 target_field: &str,
298 ) -> Result<FieldSlot, InternalError> {
299 resolve_numeric_aggregate_target_slot::<E>(target_field)
300 .map_err(AggregateFieldValueError::into_internal_error)
301 }
302
303 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
304 self.execute_paged_with_cursor(plan, PlannedCursor::none())
305 .map(|page| page.items)
306 }
307
308 pub(in crate::db) fn execute_paged_with_cursor(
309 &self,
310 plan: ExecutablePlan<E>,
311 cursor: impl Into<PlannedCursor>,
312 ) -> Result<CursorPage<E>, InternalError> {
313 self.execute_paged_with_cursor_traced(plan, cursor)
314 .map(|(page, _)| page)
315 }
316
317 pub(in crate::db) fn execute_paged_with_cursor_traced(
318 &self,
319 plan: ExecutablePlan<E>,
320 cursor: impl Into<PlannedCursor>,
321 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
322 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
323 return Err(InternalError::query_executor_invariant(
324 "grouped plans require execute_grouped pagination entrypoints",
325 ));
326 }
327
328 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
329 let cursor_boundary = cursor.boundary().cloned();
330 let index_range_token = cursor
331 .index_range_anchor()
332 .map(range_token_from_cursor_anchor);
333
334 if !plan.mode().is_load() {
335 return Err(InternalError::query_executor_invariant(
336 "load executor requires load plans",
337 ));
338 }
339
340 let continuation_signature = plan.continuation_signature();
341 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
342 let index_range_specs = plan.index_range_specs()?.to_vec();
343 let route_plan = Self::build_execution_route_plan_for_load(
344 plan.as_inner(),
345 cursor_boundary.as_ref(),
346 index_range_token.as_ref(),
347 None,
348 )?;
349 let continuation_applied = !matches!(
350 route_plan.continuation_mode(),
351 crate::db::executor::route::ContinuationMode::Initial
352 );
353 let direction = route_plan.direction();
354 debug_assert_eq!(
355 route_plan.window().effective_offset,
356 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
357 "route window effective offset must match logical plan offset semantics",
358 );
359 let mut execution_trace = self
360 .debug
361 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
362 let plan = plan.into_inner();
363 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
364
365 let result = (|| {
366 let mut span = Span::<E>::new(ExecKind::Load);
367
368 validate_executor_plan::<E>(&plan)?;
369 let ctx = self.db.recovered_context::<E>()?;
370 let execution_inputs = ExecutionInputs {
371 ctx: &ctx,
372 plan: &plan,
373 stream_bindings: AccessStreamBindings {
374 index_prefix_specs: index_prefix_specs.as_slice(),
375 index_range_specs: index_range_specs.as_slice(),
376 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
377 direction,
378 },
379 execution_preparation: &execution_preparation,
380 };
381
382 record_plan_metrics(&plan.access);
383 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
386 &execution_inputs,
387 &route_plan,
388 cursor_boundary.as_ref(),
389 continuation_signature,
390 IndexCompilePolicy::ConservativeSubset,
391 )?;
392 let page = materialized.page;
393 let rows_scanned = materialized.rows_scanned;
394 let post_access_rows = materialized.post_access_rows;
395 let optimization = materialized.optimization;
396 let index_predicate_applied = materialized.index_predicate_applied;
397 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
398 let distinct_keys_deduped = materialized.distinct_keys_deduped;
399
400 Ok(Self::finalize_execution(
401 page,
402 optimization,
403 rows_scanned,
404 post_access_rows,
405 index_predicate_applied,
406 index_predicate_keys_rejected,
407 distinct_keys_deduped,
408 &mut span,
409 &mut execution_trace,
410 ))
411 })();
412
413 result.map(|page| (page, execution_trace))
414 }
415
416 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
417 &self,
418 plan: ExecutablePlan<E>,
419 cursor: impl Into<GroupedPlannedCursor>,
420 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
421 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
422 return Err(InternalError::query_executor_invariant(
423 "grouped execution requires grouped logical plans",
424 ));
425 }
426
427 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
428
429 self.execute_grouped_path(plan, cursor)
430 }
431
432 #[expect(clippy::too_many_lines)]
434 fn execute_grouped_path(
435 &self,
436 plan: ExecutablePlan<E>,
437 cursor: GroupedPlannedCursor,
438 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
439 validate_executor_plan::<E>(plan.as_inner())?;
440 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
441 let grouped_execution = grouped_handoff.execution();
442 let group_fields = grouped_handoff.group_fields().to_vec();
443 let grouped_aggregates = grouped_handoff.aggregates().to_vec();
444 let grouped_having = grouped_handoff.having().cloned();
445 let grouped_route_plan =
446 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
447 let grouped_route_observability =
448 grouped_route_plan.grouped_observability().ok_or_else(|| {
449 InternalError::query_executor_invariant(
450 "grouped route planning must emit grouped observability payload",
451 )
452 })?;
453 let direction = grouped_route_plan.direction();
454 let continuation_applied = !cursor.is_empty();
455 let mut execution_trace = self
456 .debug
457 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
458 let continuation_signature = plan.continuation_signature();
459 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
460 let index_range_specs = plan.index_range_specs()?.to_vec();
461
462 let mut grouped_execution_context =
463 grouped_execution_context_from_planner_config(Some(grouped_execution));
464 let max_groups_bound =
465 usize::try_from(grouped_execution_context.config().max_groups()).unwrap_or(usize::MAX);
466 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
467 debug_assert!(
468 grouped_budget.max_groups() >= grouped_budget.groups()
469 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
470 && grouped_execution_context
471 .config()
472 .max_distinct_values_total()
473 >= grouped_budget.distinct_values()
474 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
475 "grouped budget observability invariants must hold at grouped route entry"
476 );
477
478 let grouped_route_outcome = grouped_route_observability.outcome();
480 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
481 let grouped_route_eligible = grouped_route_observability.eligible();
482 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
483 let grouped_plan_metrics_strategy =
484 match grouped_route_observability.grouped_execution_strategy() {
485 crate::db::executor::route::GroupedExecutionStrategy::HashMaterialized => {
486 GroupedPlanMetricsStrategy::HashMaterialized
487 }
488 crate::db::executor::route::GroupedExecutionStrategy::OrderedMaterialized => {
489 GroupedPlanMetricsStrategy::OrderedMaterialized
490 }
491 };
492 debug_assert!(
493 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
494 "grouped route eligibility and rejection reason must stay aligned",
495 );
496 debug_assert!(
497 grouped_route_outcome
498 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
499 || grouped_route_rejection_reason.is_some(),
500 "grouped rejected outcomes must carry a rejection reason",
501 );
502 debug_assert!(
503 matches!(
504 grouped_route_execution_mode,
505 crate::db::executor::route::ExecutionMode::Materialized
506 ),
507 "grouped execution route must remain blocking/materialized",
508 );
509 let global_distinct_field_aggregate = Self::global_distinct_field_aggregate_spec(
510 group_fields.as_slice(),
511 grouped_aggregates.as_slice(),
512 grouped_having.as_ref(),
513 )?;
514 let (mut grouped_engines, mut short_circuit_keys) =
515 if global_distinct_field_aggregate.is_none() {
516 let grouped_engines = grouped_aggregates
517 .iter()
518 .map(|aggregate| {
519 if aggregate.target_field().is_some() {
520 return Err(InternalError::query_executor_invariant(format!(
521 "grouped field-target aggregate reached executor after planning: {:?}",
522 aggregate.kind()
523 )));
524 }
525
526 Ok(grouped_execution_context.create_grouped_engine::<E>(
527 aggregate.kind(),
528 aggregate_materialized_fold_direction(aggregate.kind()),
529 aggregate.distinct(),
530 ))
531 })
532 .collect::<Result<Vec<_>, _>>()?;
533 let short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
534
535 (grouped_engines, short_circuit_keys)
536 } else {
537 (Vec::new(), Vec::new())
538 };
539 let plan = plan.into_inner();
540 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
541
542 let mut span = Span::<E>::new(ExecKind::Load);
543 let ctx = self.db.recovered_context::<E>()?;
544 let execution_inputs = ExecutionInputs {
545 ctx: &ctx,
546 plan: &plan,
547 stream_bindings: AccessStreamBindings {
548 index_prefix_specs: index_prefix_specs.as_slice(),
549 index_range_specs: index_range_specs.as_slice(),
550 index_range_anchor: None,
551 direction,
552 },
553 execution_preparation: &execution_preparation,
554 };
555 record_grouped_plan_metrics(&plan.access, grouped_plan_metrics_strategy);
556 let mut resolved = Self::resolve_execution_key_stream_without_distinct(
557 &execution_inputs,
558 &grouped_route_plan,
559 IndexCompilePolicy::ConservativeSubset,
560 )?;
561 let mut scanned_rows = 0usize;
562 let mut filtered_rows = 0usize;
563 let compiled_predicate = execution_preparation.compiled_predicate();
564
565 if let Some((aggregate_kind, target_field)) = global_distinct_field_aggregate {
566 if !cursor.is_empty() {
567 return Err(InternalError::from_cursor_plan_error(
568 crate::db::cursor::CursorPlanError::invalid_continuation_cursor_payload(
569 "global DISTINCT grouped aggregates do not support continuation cursors",
570 ),
571 ));
572 }
573
574 let global_row = Self::execute_global_distinct_field_aggregate(
575 &plan,
576 &ctx,
577 &mut resolved,
578 compiled_predicate,
579 &mut grouped_execution_context,
580 (aggregate_kind, target_field.as_str()),
581 (&mut scanned_rows, &mut filtered_rows),
582 )?;
583 let page_rows = Self::page_global_distinct_grouped_row(
584 global_row,
585 plan.scalar_plan().page.as_ref(),
586 );
587 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
588 let optimization = resolved.optimization;
589 let index_predicate_applied = resolved.index_predicate_applied;
590 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
591 let distinct_keys_deduped = resolved
592 .distinct_keys_deduped_counter
593 .as_ref()
594 .map_or(0, |counter| counter.get());
595 let rows_returned = page_rows.len();
596
597 Self::finalize_path_outcome(
598 &mut execution_trace,
599 optimization,
600 rows_scanned,
601 rows_returned,
602 index_predicate_applied,
603 index_predicate_keys_rejected,
604 distinct_keys_deduped,
605 );
606 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
607
608 return Ok((
609 GroupedCursorPage {
610 rows: page_rows,
611 next_cursor: None,
612 },
613 execution_trace,
614 ));
615 }
616
617 while let Some(key) = resolved.key_stream.next_key()? {
619 let row = match plan.scalar_plan().consistency {
620 MissingRowPolicy::Error => ctx.read_strict(&key),
621 MissingRowPolicy::Ignore => ctx.read(&key),
622 };
623 let row = match row {
624 Ok(row) => row,
625 Err(err) if err.is_not_found() => continue,
626 Err(err) => return Err(err),
627 };
628 scanned_rows = scanned_rows.saturating_add(1);
629 let (id, entity) = Context::<E>::deserialize_row((key, row))?;
630 if let Some(compiled_predicate) = compiled_predicate
631 && !compiled_predicate.eval(&entity)
632 {
633 continue;
634 }
635 filtered_rows = filtered_rows.saturating_add(1);
636
637 let group_values = group_fields
638 .iter()
639 .map(|field| {
640 entity.get_value_by_index(field.index()).ok_or_else(|| {
641 InternalError::query_executor_invariant(format!(
642 "grouped field slot missing on entity: index={}",
643 field.index()
644 ))
645 })
646 })
647 .collect::<Result<Vec<_>, _>>()?;
648 let group_key = Value::List(group_values)
649 .canonical_key()
650 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
651 let canonical_group_value = group_key.canonical_value().clone();
652 let data_key = DataKey::try_new::<E>(id.key())?;
653
654 for (index, engine) in grouped_engines.iter_mut().enumerate() {
655 if short_circuit_keys[index].iter().any(|done| {
656 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
657 }) {
658 continue;
659 }
660
661 let fold_control = engine
662 .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
663 .map_err(Self::map_group_error)?;
664 if matches!(fold_control, FoldControl::Break) {
665 short_circuit_keys[index].push(canonical_group_value.clone());
666 debug_assert!(
667 short_circuit_keys[index].len() <= max_groups_bound,
668 "grouped short-circuit key tracking must stay bounded by max_groups",
669 );
670 }
671 }
672 }
673
674 let aggregate_count = grouped_engines.len();
679 if aggregate_count == 0 {
680 return Err(InternalError::query_executor_invariant(
681 "grouped execution requires at least one aggregate terminal",
682 ));
683 }
684 let mut finalized_iters = grouped_engines
685 .into_iter()
686 .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
687 .collect::<Result<Vec<_>, _>>()?;
688 let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
689 InternalError::query_executor_invariant("missing grouped primary iterator")
690 })?;
691
692 let initial_offset = plan
694 .scalar_plan()
695 .page
696 .as_ref()
697 .map_or(0, |page| page.offset);
698 let resume_initial_offset = if cursor.is_empty() {
699 initial_offset
700 } else {
701 cursor.initial_offset()
702 };
703 let resume_boundary = cursor
704 .last_group_key()
705 .map(|last_group_key| Value::List(last_group_key.to_vec()));
706 let apply_initial_offset = cursor.is_empty();
707 let limit = plan
708 .scalar_plan()
709 .page
710 .as_ref()
711 .and_then(|page| page.limit)
712 .and_then(|limit| usize::try_from(limit).ok());
713 let initial_offset_for_page = if apply_initial_offset {
714 usize::try_from(initial_offset).unwrap_or(usize::MAX)
715 } else {
716 0
717 };
718 let selection_bound = limit.and_then(|limit| {
719 limit
720 .checked_add(initial_offset_for_page)
721 .and_then(|count| count.checked_add(1))
722 });
723 let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
724 if limit.is_none_or(|limit| limit != 0) {
725 for primary_output in primary_iter.by_ref() {
726 let group_key_value = primary_output.group_key().canonical_value().clone();
727 let mut aggregate_values = Vec::with_capacity(aggregate_count);
728 aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
729 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
730 let sibling_output = sibling_iter.next().ok_or_else(|| {
731 InternalError::query_executor_invariant(format!(
732 "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
733 ))
734 })?;
735 let sibling_group_key = sibling_output.group_key().canonical_value();
736 if canonical_value_compare(sibling_group_key, &group_key_value)
737 != Ordering::Equal
738 {
739 return Err(InternalError::query_executor_invariant(format!(
740 "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
741 )));
742 }
743 aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
744 }
745 debug_assert_eq!(
746 aggregate_values.len(),
747 aggregate_count,
748 "grouped aggregate value alignment must preserve declared aggregate count",
749 );
750 if let Some(grouped_having) = grouped_having.as_ref()
751 && !Self::group_matches_having(
752 grouped_having,
753 group_fields.as_slice(),
754 &group_key_value,
755 aggregate_values.as_slice(),
756 )?
757 {
758 continue;
759 }
760
761 if let Some(resume_boundary) = resume_boundary.as_ref()
762 && canonical_value_compare(&group_key_value, resume_boundary)
763 != Ordering::Greater
764 {
765 continue;
766 }
767
768 if let Some(selection_bound) = selection_bound {
771 match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
772 canonical_value_compare(existing_key, &group_key_value)
773 }) {
774 Ok(_) => {
775 return Err(InternalError::query_executor_invariant(format!(
776 "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
777 )));
778 }
779 Err(insert_index) => {
780 grouped_candidate_rows
781 .insert(insert_index, (group_key_value, aggregate_values));
782 if grouped_candidate_rows.len() > selection_bound {
783 let _ = grouped_candidate_rows.pop();
784 }
785 debug_assert!(
786 grouped_candidate_rows.len() <= selection_bound,
787 "bounded grouped candidate rows must stay <= selection_bound",
788 );
789 }
790 }
791 } else {
792 grouped_candidate_rows.push((group_key_value, aggregate_values));
793 debug_assert!(
794 grouped_candidate_rows.len() <= max_groups_bound,
795 "grouped candidate rows must stay bounded by max_groups",
796 );
797 }
798 }
799 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
800 if sibling_iter.next().is_some() {
801 return Err(InternalError::query_executor_invariant(format!(
802 "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
803 )));
804 }
805 }
806 if selection_bound.is_none() {
807 grouped_candidate_rows
808 .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
809 }
810 }
811 if let Some(selection_bound) = selection_bound {
812 debug_assert!(
813 grouped_candidate_rows.len() <= selection_bound,
814 "grouped candidate rows must remain bounded by selection_bound",
815 );
816 } else {
817 debug_assert!(
818 grouped_candidate_rows.len() <= max_groups_bound,
819 "grouped candidate rows must remain bounded by max_groups",
820 );
821 }
822
823 let mut page_rows = Vec::<GroupedRow>::new();
824 let mut last_emitted_group_key: Option<Vec<Value>> = None;
825 let mut has_more = false;
826 let mut groups_skipped_for_offset = 0usize;
827 for (group_key_value, aggregate_values) in grouped_candidate_rows {
828 if groups_skipped_for_offset < initial_offset_for_page {
829 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
830 continue;
831 }
832 if let Some(limit) = limit
833 && page_rows.len() >= limit
834 {
835 has_more = true;
836 break;
837 }
838
839 let emitted_group_key = match group_key_value {
840 Value::List(values) => values,
841 value => {
842 return Err(InternalError::query_executor_invariant(format!(
843 "grouped canonical key must be Value::List, found {value:?}"
844 )));
845 }
846 };
847 last_emitted_group_key = Some(emitted_group_key.clone());
848 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
849 debug_assert!(
850 limit.is_none_or(|bounded_limit| page_rows.len() <= bounded_limit),
851 "grouped page rows must not exceed explicit page limit",
852 );
853 }
854
855 let next_cursor = if has_more {
856 last_emitted_group_key.map(|last_group_key| {
857 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
858 continuation_signature,
859 last_group_key,
860 Direction::Asc,
861 resume_initial_offset,
862 ))
863 })
864 } else {
865 None
866 };
867 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
868 let optimization = resolved.optimization;
869 let index_predicate_applied = resolved.index_predicate_applied;
870 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
871 let distinct_keys_deduped = resolved
872 .distinct_keys_deduped_counter
873 .as_ref()
874 .map_or(0, |counter| counter.get());
875 let rows_returned = page_rows.len();
876
877 Self::finalize_path_outcome(
878 &mut execution_trace,
879 optimization,
880 rows_scanned,
881 rows_returned,
882 index_predicate_applied,
883 index_predicate_keys_rejected,
884 distinct_keys_deduped,
885 );
886 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
887 debug_assert!(
888 filtered_rows >= rows_returned,
889 "grouped pagination must return at most filtered row cardinality",
890 );
891
892 Ok((
893 GroupedCursorPage {
894 rows: page_rows,
895 next_cursor,
896 },
897 execution_trace,
898 ))
899 }
900
901 fn map_group_error(err: GroupError) -> InternalError {
903 match err {
904 GroupError::MemoryLimitExceeded { .. } | GroupError::DistinctBudgetExceeded { .. } => {
905 InternalError::executor_internal(err.to_string())
906 }
907 GroupError::Internal(inner) => inner,
908 }
909 }
910
911 fn global_distinct_field_aggregate_spec(
914 group_fields: &[crate::db::query::plan::FieldSlot],
915 aggregates: &[GroupAggregateSpec],
916 having: Option<&GroupHavingSpec>,
917 ) -> Result<Option<(AggregateKind, String)>, InternalError> {
918 if !group_fields.is_empty() {
919 return Ok(None);
920 }
921 if aggregates.is_empty() {
922 return Ok(None);
923 }
924 if aggregates
925 .iter()
926 .all(|aggregate| aggregate.target_field().is_none())
927 {
928 return Ok(None);
929 }
930 if having.is_some() {
931 return Err(InternalError::query_executor_invariant(
932 "global DISTINCT grouped aggregate shape does not support HAVING",
933 ));
934 }
935 if aggregates.len() != 1 {
936 return Err(InternalError::query_executor_invariant(
937 "global DISTINCT grouped aggregate shape requires exactly one aggregate terminal",
938 ));
939 }
940
941 let aggregate = &aggregates[0];
942 let Some(target_field) = aggregate.target_field() else {
943 return Err(InternalError::query_executor_invariant(
944 "global DISTINCT grouped aggregate shape requires field-target aggregate",
945 ));
946 };
947 if !aggregate.distinct() {
948 return Err(InternalError::query_executor_invariant(
949 "global DISTINCT grouped aggregate shape requires DISTINCT aggregate terminal",
950 ));
951 }
952 if !matches!(aggregate.kind(), AggregateKind::Count | AggregateKind::Sum) {
953 return Err(InternalError::query_executor_invariant(format!(
954 "unsupported global DISTINCT grouped aggregate kind: {:?}",
955 aggregate.kind()
956 )));
957 }
958
959 Ok(Some((aggregate.kind(), target_field.to_string())))
960 }
961
962 fn execute_global_distinct_field_aggregate(
965 plan: &AccessPlannedQuery<E::Key>,
966 ctx: &Context<'_, E>,
967 resolved: &mut ResolvedExecutionKeyStream,
968 compiled_predicate: Option<&crate::db::predicate::PredicateProgram>,
969 grouped_execution_context: &mut crate::db::executor::aggregate::ExecutionContext,
970 aggregate_spec: (AggregateKind, &str),
971 row_counters: (&mut usize, &mut usize),
972 ) -> Result<GroupedRow, InternalError> {
973 let (aggregate_kind, target_field) = aggregate_spec;
974 let (scanned_rows, filtered_rows) = row_counters;
975 let field_slot = if aggregate_kind.is_sum() {
976 Self::resolve_numeric_field_slot(target_field)?
977 } else {
978 Self::resolve_any_field_slot(target_field)?
979 };
980 let mut distinct_values = GroupKeySet::new();
981 let mut count = 0u32;
982 let mut sum = Decimal::ZERO;
983 let mut saw_sum_value = false;
984
985 grouped_execution_context
986 .record_implicit_single_group::<E>()
987 .map_err(Self::map_group_error)?;
988
989 while let Some(key) = resolved.key_stream.next_key()? {
990 let row = match plan.scalar_plan().consistency {
991 MissingRowPolicy::Error => ctx.read_strict(&key),
992 MissingRowPolicy::Ignore => ctx.read(&key),
993 };
994 let row = match row {
995 Ok(row) => row,
996 Err(err) if err.is_not_found() => continue,
997 Err(err) => return Err(err),
998 };
999 *scanned_rows = scanned_rows.saturating_add(1);
1000 let (_, entity) = Context::<E>::deserialize_row((key, row))?;
1001 if let Some(compiled_predicate) = compiled_predicate
1002 && !compiled_predicate.eval(&entity)
1003 {
1004 continue;
1005 }
1006 *filtered_rows = filtered_rows.saturating_add(1);
1007
1008 let distinct_value = extract_orderable_field_value(&entity, target_field, field_slot)
1009 .map_err(AggregateFieldValueError::into_internal_error)?;
1010 let distinct_key = distinct_value
1011 .canonical_key()
1012 .map_err(KeyCanonicalError::into_internal_error)?;
1013 let distinct_admitted = grouped_execution_context
1014 .admit_distinct_key(
1015 &mut distinct_values,
1016 grouped_execution_context
1017 .config()
1018 .max_distinct_values_per_group(),
1019 distinct_key,
1020 )
1021 .map_err(Self::map_group_error)?;
1022 if !distinct_admitted {
1023 continue;
1024 }
1025
1026 if aggregate_kind.is_sum() {
1027 let numeric_value =
1028 extract_numeric_field_decimal(&entity, target_field, field_slot)
1029 .map_err(AggregateFieldValueError::into_internal_error)?;
1030 sum += numeric_value;
1031 saw_sum_value = true;
1032 } else {
1033 count = count.saturating_add(1);
1034 }
1035 }
1036
1037 let aggregate_value = if aggregate_kind.is_sum() {
1038 if saw_sum_value {
1039 Value::Decimal(sum)
1040 } else {
1041 Value::Null
1042 }
1043 } else {
1044 Value::Uint(u64::from(count))
1045 };
1046
1047 Ok(GroupedRow::new(Vec::new(), vec![aggregate_value]))
1048 }
1049
1050 fn page_global_distinct_grouped_row(
1052 row: GroupedRow,
1053 page: Option<&crate::db::query::plan::PageSpec>,
1054 ) -> Vec<GroupedRow> {
1055 let Some(page) = page else {
1056 return vec![row];
1057 };
1058 if page.offset > 0 || page.limit == Some(0) {
1059 return Vec::new();
1060 }
1061
1062 vec![row]
1063 }
1064
1065 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
1067 match output {
1068 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
1069 AggregateOutput::Sum(value) => value.map_or(Value::Null, Value::Decimal),
1070 AggregateOutput::Exists(value) => Value::Bool(*value),
1071 AggregateOutput::Min(value)
1072 | AggregateOutput::Max(value)
1073 | AggregateOutput::First(value)
1074 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
1075 }
1076 }
1077
1078 fn group_matches_having(
1080 having: &GroupHavingSpec,
1081 group_fields: &[crate::db::query::plan::FieldSlot],
1082 group_key_value: &Value,
1083 aggregate_values: &[Value],
1084 ) -> Result<bool, InternalError> {
1085 for (index, clause) in having.clauses().iter().enumerate() {
1086 let actual = match clause.symbol() {
1087 GroupHavingSymbol::GroupField(field_slot) => {
1088 let group_key_list = match group_key_value {
1089 Value::List(values) => values,
1090 value => {
1091 return Err(InternalError::query_executor_invariant(format!(
1092 "grouped HAVING requires list-shaped grouped keys, found {value:?}"
1093 )));
1094 }
1095 };
1096 let Some(group_field_offset) = group_fields
1097 .iter()
1098 .position(|group_field| group_field.index() == field_slot.index())
1099 else {
1100 return Err(InternalError::query_executor_invariant(format!(
1101 "grouped HAVING field is not in grouped key projection: field='{}'",
1102 field_slot.field()
1103 )));
1104 };
1105 group_key_list.get(group_field_offset).ok_or_else(|| {
1106 InternalError::query_executor_invariant(format!(
1107 "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
1108 group_key_list.len()
1109 ))
1110 })?
1111 }
1112 GroupHavingSymbol::AggregateIndex(aggregate_index) => {
1113 aggregate_values.get(*aggregate_index).ok_or_else(|| {
1114 InternalError::query_executor_invariant(format!(
1115 "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
1116 aggregate_values.len()
1117 ))
1118 })?
1119 }
1120 };
1121
1122 if !Self::having_compare_values(actual, clause.op(), clause.value())? {
1123 return Ok(false);
1124 }
1125 }
1126
1127 Ok(true)
1128 }
1129
1130 fn having_compare_values(
1132 actual: &Value,
1133 op: CompareOp,
1134 expected: &Value,
1135 ) -> Result<bool, InternalError> {
1136 let strict = CoercionSpec::default();
1137 let matches = match op {
1138 CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
1139 CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
1140 CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
1141 CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
1142 CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
1143 CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
1144 CompareOp::In
1145 | CompareOp::NotIn
1146 | CompareOp::Contains
1147 | CompareOp::StartsWith
1148 | CompareOp::EndsWith => {
1149 return Err(InternalError::query_executor_invariant(format!(
1150 "unsupported grouped HAVING operator reached executor: {op:?}"
1151 )));
1152 }
1153 };
1154
1155 Ok(matches)
1156 }
1157
1158 fn finalize_path_outcome(
1160 execution_trace: &mut Option<ExecutionTrace>,
1161 optimization: Option<ExecutionOptimization>,
1162 rows_scanned: usize,
1163 rows_returned: usize,
1164 index_predicate_applied: bool,
1165 index_predicate_keys_rejected: u64,
1166 distinct_keys_deduped: u64,
1167 ) {
1168 record_rows_scanned::<E>(rows_scanned);
1169 if let Some(execution_trace) = execution_trace.as_mut() {
1170 execution_trace.set_path_outcome(
1171 optimization,
1172 rows_scanned,
1173 rows_returned,
1174 index_predicate_applied,
1175 index_predicate_keys_rejected,
1176 distinct_keys_deduped,
1177 );
1178 debug_assert_eq!(
1179 execution_trace.keys_scanned,
1180 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
1181 "execution trace keys_scanned must match rows_scanned metrics input",
1182 );
1183 }
1184 }
1185
1186 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
1188 plan: &AccessPlannedQuery<E::Key>,
1189 cursor_boundary: Option<&CursorBoundary>,
1190 ) -> Result<(), InternalError> {
1191 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
1192 return Ok(());
1193 }
1194 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
1195
1196 Ok(())
1197 }
1198}