1mod execute;
7mod fast_stream;
8mod index_range_limit;
9mod page;
10mod pk_stream;
11mod secondary_index;
12mod terminal;
13mod trace;
14
15pub(in crate::db::executor) use self::execute::{
16 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
17};
18
19use self::trace::{access_path_variant, execution_order_direction};
20use crate::{
21 db::{
22 Context, Db, GroupedRow,
23 access::AccessPlan,
24 contracts::canonical_value_compare,
25 cursor::{
26 ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
27 PlannedCursor, decode_pk_cursor_boundary,
28 },
29 data::DataKey,
30 direction::Direction,
31 executor::{
32 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
33 KeyOrderComparator, OrderedKeyStreamBox,
34 aggregate::field::{
35 AggregateFieldValueError, FieldSlot, extract_numeric_field_decimal,
36 extract_orderable_field_value, resolve_any_aggregate_target_slot,
37 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
38 },
39 aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
40 group::{
41 CanonicalKey, GroupKeySet, KeyCanonicalError, grouped_budget_observability,
42 grouped_execution_context_from_planner_config,
43 },
44 plan_metrics::{
45 GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
46 record_rows_scanned,
47 },
48 range_token_anchor_key, range_token_from_cursor_anchor,
49 route::aggregate_materialized_fold_direction,
50 validate_executor_plan,
51 },
52 index::IndexCompilePolicy,
53 predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
54 query::plan::{
55 AccessPlannedQuery, GroupAggregateSpec, GroupHavingSpec, GroupHavingSymbol,
56 LogicalPlan, OrderDirection, grouped_executor_handoff,
57 },
58 response::Response,
59 },
60 error::InternalError,
61 obs::sink::{ExecKind, Span},
62 traits::{EntityKind, EntityValue},
63 types::Decimal,
64 value::Value,
65};
66use std::{cmp::Ordering, marker::PhantomData};
67
68#[derive(Clone, Debug, Eq, PartialEq)]
74pub(in crate::db) enum PageCursor {
75 Scalar(ContinuationToken),
76 Grouped(GroupedContinuationToken),
77}
78
79impl PageCursor {
80 #[must_use]
82 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
83 match self {
84 Self::Scalar(token) => Some(token),
85 Self::Grouped(_) => None,
86 }
87 }
88
89 #[must_use]
91 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
92 match self {
93 Self::Scalar(_) => None,
94 Self::Grouped(token) => Some(token),
95 }
96 }
97}
98
99impl From<ContinuationToken> for PageCursor {
100 fn from(value: ContinuationToken) -> Self {
101 Self::Scalar(value)
102 }
103}
104
105impl From<GroupedContinuationToken> for PageCursor {
106 fn from(value: GroupedContinuationToken) -> Self {
107 Self::Grouped(value)
108 }
109}
110
111#[derive(Debug)]
119pub(crate) struct CursorPage<E: EntityKind> {
120 pub(crate) items: Response<E>,
121
122 pub(crate) next_cursor: Option<PageCursor>,
123}
124
125#[derive(Debug)]
131pub(in crate::db) struct GroupedCursorPage {
132 pub(in crate::db) rows: Vec<GroupedRow>,
133 pub(in crate::db) next_cursor: Option<PageCursor>,
134}
135
136#[derive(Clone, Copy, Debug, Eq, PartialEq)]
143pub enum ExecutionAccessPathVariant {
144 ByKey,
145 ByKeys,
146 KeyRange,
147 IndexPrefix,
148 IndexRange,
149 FullScan,
150 Union,
151 Intersection,
152}
153
154#[derive(Clone, Copy, Debug, Eq, PartialEq)]
161pub enum ExecutionOptimization {
162 PrimaryKey,
163 SecondaryOrderPushdown,
164 IndexRangeLimitPushdown,
165}
166
167#[derive(Clone, Copy, Debug, Eq, PartialEq)]
175pub struct ExecutionTrace {
176 pub access_path_variant: ExecutionAccessPathVariant,
177 pub direction: OrderDirection,
178 pub optimization: Option<ExecutionOptimization>,
179 pub keys_scanned: u64,
180 pub rows_returned: u64,
181 pub continuation_applied: bool,
182 pub index_predicate_applied: bool,
183 pub index_predicate_keys_rejected: u64,
184 pub distinct_keys_deduped: u64,
185}
186
187impl ExecutionTrace {
188 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
189 Self {
190 access_path_variant: access_path_variant(access),
191 direction: execution_order_direction(direction),
192 optimization: None,
193 keys_scanned: 0,
194 rows_returned: 0,
195 continuation_applied,
196 index_predicate_applied: false,
197 index_predicate_keys_rejected: 0,
198 distinct_keys_deduped: 0,
199 }
200 }
201
202 fn set_path_outcome(
203 &mut self,
204 optimization: Option<ExecutionOptimization>,
205 keys_scanned: usize,
206 rows_returned: usize,
207 index_predicate_applied: bool,
208 index_predicate_keys_rejected: u64,
209 distinct_keys_deduped: u64,
210 ) {
211 self.optimization = optimization;
212 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
213 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
214 self.index_predicate_applied = index_predicate_applied;
215 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
216 self.distinct_keys_deduped = distinct_keys_deduped;
217 }
218}
219
220pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
222 direction: Direction,
223) -> KeyOrderComparator {
224 KeyOrderComparator::from_direction(direction)
225}
226
227pub(in crate::db::executor) struct FastPathKeyResult {
235 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
236 pub(in crate::db::executor) rows_scanned: usize,
237 pub(in crate::db::executor) optimization: ExecutionOptimization,
238}
239
240#[derive(Clone)]
248pub(crate) struct LoadExecutor<E: EntityKind> {
249 db: Db<E::Canister>,
250 debug: bool,
251 _marker: PhantomData<E>,
252}
253
254impl<E> LoadExecutor<E>
255where
256 E: EntityKind + EntityValue,
257{
258 #[must_use]
260 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
261 Self {
262 db,
263 debug,
264 _marker: PhantomData,
265 }
266 }
267
268 pub(in crate::db::executor) fn recovered_context(
270 &self,
271 ) -> Result<crate::db::Context<'_, E>, InternalError> {
272 self.db.recovered_context::<E>()
273 }
274
275 pub(in crate::db::executor) fn resolve_orderable_field_slot(
278 target_field: &str,
279 ) -> Result<FieldSlot, InternalError> {
280 resolve_orderable_aggregate_target_slot::<E>(target_field)
281 .map_err(AggregateFieldValueError::into_internal_error)
282 }
283
284 pub(in crate::db::executor) fn resolve_any_field_slot(
287 target_field: &str,
288 ) -> Result<FieldSlot, InternalError> {
289 resolve_any_aggregate_target_slot::<E>(target_field)
290 .map_err(AggregateFieldValueError::into_internal_error)
291 }
292
293 pub(in crate::db::executor) fn resolve_numeric_field_slot(
296 target_field: &str,
297 ) -> Result<FieldSlot, InternalError> {
298 resolve_numeric_aggregate_target_slot::<E>(target_field)
299 .map_err(AggregateFieldValueError::into_internal_error)
300 }
301
302 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
303 self.execute_paged_with_cursor(plan, PlannedCursor::none())
304 .map(|page| page.items)
305 }
306
307 pub(in crate::db) fn execute_paged_with_cursor(
308 &self,
309 plan: ExecutablePlan<E>,
310 cursor: impl Into<PlannedCursor>,
311 ) -> Result<CursorPage<E>, InternalError> {
312 self.execute_paged_with_cursor_traced(plan, cursor)
313 .map(|(page, _)| page)
314 }
315
316 pub(in crate::db) fn execute_paged_with_cursor_traced(
317 &self,
318 plan: ExecutablePlan<E>,
319 cursor: impl Into<PlannedCursor>,
320 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
321 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
322 return Err(InternalError::query_executor_invariant(
323 "grouped plans require execute_grouped pagination entrypoints",
324 ));
325 }
326
327 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
328 let cursor_boundary = cursor.boundary().cloned();
329 let index_range_token = cursor
330 .index_range_anchor()
331 .map(range_token_from_cursor_anchor);
332
333 if !plan.mode().is_load() {
334 return Err(InternalError::query_executor_invariant(
335 "load executor requires load plans",
336 ));
337 }
338
339 let continuation_signature = plan.continuation_signature();
340 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
341 let index_range_specs = plan.index_range_specs()?.to_vec();
342 let route_plan = Self::build_execution_route_plan_for_load(
343 plan.as_inner(),
344 cursor_boundary.as_ref(),
345 index_range_token.as_ref(),
346 None,
347 )?;
348 let continuation_applied = !matches!(
349 route_plan.continuation_mode(),
350 crate::db::executor::route::ContinuationMode::Initial
351 );
352 let direction = route_plan.direction();
353 debug_assert_eq!(
354 route_plan.window().effective_offset,
355 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
356 "route window effective offset must match logical plan offset semantics",
357 );
358 let mut execution_trace = self
359 .debug
360 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
361 let plan = plan.into_inner();
362 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
363
364 let result = (|| {
365 let mut span = Span::<E>::new(ExecKind::Load);
366
367 validate_executor_plan::<E>(&plan)?;
368 let ctx = self.db.recovered_context::<E>()?;
369 let execution_inputs = ExecutionInputs {
370 ctx: &ctx,
371 plan: &plan,
372 stream_bindings: AccessStreamBindings {
373 index_prefix_specs: index_prefix_specs.as_slice(),
374 index_range_specs: index_range_specs.as_slice(),
375 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
376 direction,
377 },
378 execution_preparation: &execution_preparation,
379 };
380
381 record_plan_metrics(&plan.access);
382 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
385 &execution_inputs,
386 &route_plan,
387 cursor_boundary.as_ref(),
388 continuation_signature,
389 IndexCompilePolicy::ConservativeSubset,
390 )?;
391 let page = materialized.page;
392 let rows_scanned = materialized.rows_scanned;
393 let post_access_rows = materialized.post_access_rows;
394 let optimization = materialized.optimization;
395 let index_predicate_applied = materialized.index_predicate_applied;
396 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
397 let distinct_keys_deduped = materialized.distinct_keys_deduped;
398
399 Ok(Self::finalize_execution(
400 page,
401 optimization,
402 rows_scanned,
403 post_access_rows,
404 index_predicate_applied,
405 index_predicate_keys_rejected,
406 distinct_keys_deduped,
407 &mut span,
408 &mut execution_trace,
409 ))
410 })();
411
412 result.map(|page| (page, execution_trace))
413 }
414
415 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
416 &self,
417 plan: ExecutablePlan<E>,
418 cursor: impl Into<GroupedPlannedCursor>,
419 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
420 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
421 return Err(InternalError::query_executor_invariant(
422 "grouped execution requires grouped logical plans",
423 ));
424 }
425
426 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
427
428 self.execute_grouped_path(plan, cursor)
429 }
430
431 #[expect(clippy::too_many_lines)]
433 fn execute_grouped_path(
434 &self,
435 plan: ExecutablePlan<E>,
436 cursor: GroupedPlannedCursor,
437 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
438 validate_executor_plan::<E>(plan.as_inner())?;
439 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
440 let grouped_execution = grouped_handoff.execution();
441 let group_fields = grouped_handoff.group_fields().to_vec();
442 let grouped_aggregates = grouped_handoff.aggregates().to_vec();
443 let grouped_having = grouped_handoff.having().cloned();
444 let grouped_route_plan =
445 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
446 let grouped_route_observability =
447 grouped_route_plan.grouped_observability().ok_or_else(|| {
448 InternalError::query_executor_invariant(
449 "grouped route planning must emit grouped observability payload",
450 )
451 })?;
452 let direction = grouped_route_plan.direction();
453 let continuation_applied = !cursor.is_empty();
454 let mut execution_trace = self
455 .debug
456 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
457 let continuation_signature = plan.continuation_signature();
458 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
459 let index_range_specs = plan.index_range_specs()?.to_vec();
460
461 let mut grouped_execution_context =
462 grouped_execution_context_from_planner_config(Some(grouped_execution));
463 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
464 debug_assert!(
465 grouped_budget.max_groups() >= grouped_budget.groups()
466 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
467 && grouped_execution_context
468 .config()
469 .max_distinct_values_total()
470 >= grouped_budget.distinct_values()
471 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
472 "grouped budget observability invariants must hold at grouped route entry"
473 );
474
475 let grouped_route_outcome = grouped_route_observability.outcome();
477 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
478 let grouped_route_eligible = grouped_route_observability.eligible();
479 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
480 let grouped_plan_metrics_strategy =
481 match grouped_route_observability.grouped_execution_strategy() {
482 crate::db::executor::route::GroupedExecutionStrategy::HashGroup => {
483 GroupedPlanMetricsStrategy::HashMaterialized
484 }
485 crate::db::executor::route::GroupedExecutionStrategy::OrderedGroup => {
486 GroupedPlanMetricsStrategy::OrderedStreaming
487 }
488 };
489 debug_assert!(
490 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
491 "grouped route eligibility and rejection reason must stay aligned",
492 );
493 debug_assert!(
494 grouped_route_outcome
495 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
496 || grouped_route_rejection_reason.is_some(),
497 "grouped rejected outcomes must carry a rejection reason",
498 );
499 debug_assert!(
500 matches!(
501 grouped_route_execution_mode,
502 crate::db::executor::route::ExecutionMode::Materialized
503 ),
504 "grouped execution route must remain blocking/materialized",
505 );
506 let global_distinct_field_aggregate = Self::global_distinct_field_aggregate_spec(
507 group_fields.as_slice(),
508 grouped_aggregates.as_slice(),
509 grouped_having.as_ref(),
510 )?;
511 let (mut grouped_engines, mut short_circuit_keys) =
512 if global_distinct_field_aggregate.is_none() {
513 let grouped_engines = grouped_aggregates
514 .iter()
515 .map(|aggregate| {
516 if aggregate.target_field().is_some() {
517 return Err(InternalError::query_executor_invariant(format!(
518 "grouped field-target aggregate reached executor after planning: {:?}",
519 aggregate.kind()
520 )));
521 }
522
523 Ok(grouped_execution_context.create_grouped_engine::<E>(
524 aggregate.kind(),
525 aggregate_materialized_fold_direction(aggregate.kind()),
526 aggregate.distinct(),
527 ))
528 })
529 .collect::<Result<Vec<_>, _>>()?;
530 let short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
531
532 (grouped_engines, short_circuit_keys)
533 } else {
534 (Vec::new(), Vec::new())
535 };
536 let plan = plan.into_inner();
537 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
538
539 let mut span = Span::<E>::new(ExecKind::Load);
540 let ctx = self.db.recovered_context::<E>()?;
541 let execution_inputs = ExecutionInputs {
542 ctx: &ctx,
543 plan: &plan,
544 stream_bindings: AccessStreamBindings {
545 index_prefix_specs: index_prefix_specs.as_slice(),
546 index_range_specs: index_range_specs.as_slice(),
547 index_range_anchor: None,
548 direction,
549 },
550 execution_preparation: &execution_preparation,
551 };
552 record_grouped_plan_metrics(&plan.access, grouped_plan_metrics_strategy);
553 let mut resolved = Self::resolve_execution_key_stream_without_distinct(
554 &execution_inputs,
555 &grouped_route_plan,
556 IndexCompilePolicy::ConservativeSubset,
557 )?;
558 let mut scanned_rows = 0usize;
559 let mut filtered_rows = 0usize;
560 let compiled_predicate = execution_preparation.compiled_predicate();
561
562 if let Some((aggregate_kind, target_field)) = global_distinct_field_aggregate {
563 if !cursor.is_empty() {
564 return Err(InternalError::from_cursor_plan_error(
565 crate::db::cursor::CursorPlanError::invalid_continuation_cursor_payload(
566 "global DISTINCT grouped aggregates do not support continuation cursors",
567 ),
568 ));
569 }
570
571 let global_row = Self::execute_global_distinct_field_aggregate(
572 &plan,
573 &ctx,
574 &mut resolved,
575 compiled_predicate,
576 &mut grouped_execution_context,
577 (aggregate_kind, target_field.as_str()),
578 (&mut scanned_rows, &mut filtered_rows),
579 )?;
580 let page_rows = Self::page_global_distinct_grouped_row(
581 global_row,
582 plan.scalar_plan().page.as_ref(),
583 );
584 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
585 let optimization = resolved.optimization;
586 let index_predicate_applied = resolved.index_predicate_applied;
587 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
588 let distinct_keys_deduped = resolved
589 .distinct_keys_deduped_counter
590 .as_ref()
591 .map_or(0, |counter| counter.get());
592 let rows_returned = page_rows.len();
593
594 Self::finalize_path_outcome(
595 &mut execution_trace,
596 optimization,
597 rows_scanned,
598 rows_returned,
599 index_predicate_applied,
600 index_predicate_keys_rejected,
601 distinct_keys_deduped,
602 );
603 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
604
605 return Ok((
606 GroupedCursorPage {
607 rows: page_rows,
608 next_cursor: None,
609 },
610 execution_trace,
611 ));
612 }
613
614 while let Some(key) = resolved.key_stream.next_key()? {
616 let row = match plan.scalar_plan().consistency {
617 MissingRowPolicy::Error => ctx.read_strict(&key),
618 MissingRowPolicy::Ignore => ctx.read(&key),
619 };
620 let row = match row {
621 Ok(row) => row,
622 Err(err) if err.is_not_found() => continue,
623 Err(err) => return Err(err),
624 };
625 scanned_rows = scanned_rows.saturating_add(1);
626 let (id, entity) = Context::<E>::deserialize_row((key, row))?;
627 if let Some(compiled_predicate) = compiled_predicate
628 && !compiled_predicate.eval(&entity)
629 {
630 continue;
631 }
632 filtered_rows = filtered_rows.saturating_add(1);
633
634 let group_values = group_fields
635 .iter()
636 .map(|field| {
637 entity.get_value_by_index(field.index()).ok_or_else(|| {
638 InternalError::query_executor_invariant(format!(
639 "grouped field slot missing on entity: index={}",
640 field.index()
641 ))
642 })
643 })
644 .collect::<Result<Vec<_>, _>>()?;
645 let group_key = Value::List(group_values)
646 .canonical_key()
647 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
648 let canonical_group_value = group_key.canonical_value().clone();
649 let data_key = DataKey::try_new::<E>(id.key())?;
650
651 for (index, engine) in grouped_engines.iter_mut().enumerate() {
652 if short_circuit_keys[index].iter().any(|done| {
653 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
654 }) {
655 continue;
656 }
657
658 let fold_control = engine
659 .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
660 .map_err(Self::map_group_error)?;
661 if matches!(fold_control, FoldControl::Break) {
662 short_circuit_keys[index].push(canonical_group_value.clone());
663 }
664 }
665 }
666
667 let aggregate_count = grouped_engines.len();
672 if aggregate_count == 0 {
673 return Err(InternalError::query_executor_invariant(
674 "grouped execution requires at least one aggregate terminal",
675 ));
676 }
677 let mut finalized_iters = grouped_engines
678 .into_iter()
679 .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
680 .collect::<Result<Vec<_>, _>>()?;
681 let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
682 InternalError::query_executor_invariant("missing grouped primary iterator")
683 })?;
684
685 let initial_offset = plan
687 .scalar_plan()
688 .page
689 .as_ref()
690 .map_or(0, |page| page.offset);
691 let resume_initial_offset = if cursor.is_empty() {
692 initial_offset
693 } else {
694 cursor.initial_offset()
695 };
696 let resume_boundary = cursor
697 .last_group_key()
698 .map(|last_group_key| Value::List(last_group_key.to_vec()));
699 let apply_initial_offset = cursor.is_empty();
700 let limit = plan
701 .scalar_plan()
702 .page
703 .as_ref()
704 .and_then(|page| page.limit)
705 .and_then(|limit| usize::try_from(limit).ok());
706 let initial_offset_for_page = if apply_initial_offset {
707 usize::try_from(initial_offset).unwrap_or(usize::MAX)
708 } else {
709 0
710 };
711 let selection_bound = limit.and_then(|limit| {
712 limit
713 .checked_add(initial_offset_for_page)
714 .and_then(|count| count.checked_add(1))
715 });
716 let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
717 if limit.is_none_or(|limit| limit != 0) {
718 for primary_output in primary_iter.by_ref() {
719 let group_key_value = primary_output.group_key().canonical_value().clone();
720 let mut aggregate_values = Vec::with_capacity(aggregate_count);
721 aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
722 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
723 let sibling_output = sibling_iter.next().ok_or_else(|| {
724 InternalError::query_executor_invariant(format!(
725 "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
726 ))
727 })?;
728 let sibling_group_key = sibling_output.group_key().canonical_value();
729 if canonical_value_compare(sibling_group_key, &group_key_value)
730 != Ordering::Equal
731 {
732 return Err(InternalError::query_executor_invariant(format!(
733 "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
734 )));
735 }
736 aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
737 }
738 debug_assert_eq!(
739 aggregate_values.len(),
740 aggregate_count,
741 "grouped aggregate value alignment must preserve declared aggregate count",
742 );
743 if let Some(grouped_having) = grouped_having.as_ref()
744 && !Self::group_matches_having(
745 grouped_having,
746 group_fields.as_slice(),
747 &group_key_value,
748 aggregate_values.as_slice(),
749 )?
750 {
751 continue;
752 }
753
754 if let Some(resume_boundary) = resume_boundary.as_ref()
755 && canonical_value_compare(&group_key_value, resume_boundary)
756 != Ordering::Greater
757 {
758 continue;
759 }
760
761 if let Some(selection_bound) = selection_bound {
764 match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
765 canonical_value_compare(existing_key, &group_key_value)
766 }) {
767 Ok(_) => {
768 return Err(InternalError::query_executor_invariant(format!(
769 "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
770 )));
771 }
772 Err(insert_index) => {
773 grouped_candidate_rows
774 .insert(insert_index, (group_key_value, aggregate_values));
775 if grouped_candidate_rows.len() > selection_bound {
776 let _ = grouped_candidate_rows.pop();
777 }
778 }
779 }
780 } else {
781 grouped_candidate_rows.push((group_key_value, aggregate_values));
782 }
783 }
784 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
785 if sibling_iter.next().is_some() {
786 return Err(InternalError::query_executor_invariant(format!(
787 "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
788 )));
789 }
790 }
791 if selection_bound.is_none() {
792 grouped_candidate_rows
793 .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
794 }
795 }
796
797 let mut page_rows = Vec::<GroupedRow>::new();
798 let mut last_emitted_group_key: Option<Vec<Value>> = None;
799 let mut has_more = false;
800 let mut groups_skipped_for_offset = 0usize;
801 for (group_key_value, aggregate_values) in grouped_candidate_rows {
802 if groups_skipped_for_offset < initial_offset_for_page {
803 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
804 continue;
805 }
806 if let Some(limit) = limit
807 && page_rows.len() >= limit
808 {
809 has_more = true;
810 break;
811 }
812
813 let emitted_group_key = match group_key_value {
814 Value::List(values) => values,
815 value => {
816 return Err(InternalError::query_executor_invariant(format!(
817 "grouped canonical key must be Value::List, found {value:?}"
818 )));
819 }
820 };
821 last_emitted_group_key = Some(emitted_group_key.clone());
822 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
823 }
824
825 let next_cursor = if has_more {
826 last_emitted_group_key.map(|last_group_key| {
827 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
828 continuation_signature,
829 last_group_key,
830 Direction::Asc,
831 resume_initial_offset,
832 ))
833 })
834 } else {
835 None
836 };
837 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
838 let optimization = resolved.optimization;
839 let index_predicate_applied = resolved.index_predicate_applied;
840 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
841 let distinct_keys_deduped = resolved
842 .distinct_keys_deduped_counter
843 .as_ref()
844 .map_or(0, |counter| counter.get());
845 let rows_returned = page_rows.len();
846
847 Self::finalize_path_outcome(
848 &mut execution_trace,
849 optimization,
850 rows_scanned,
851 rows_returned,
852 index_predicate_applied,
853 index_predicate_keys_rejected,
854 distinct_keys_deduped,
855 );
856 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
857 debug_assert!(
858 filtered_rows >= rows_returned,
859 "grouped pagination must return at most filtered row cardinality",
860 );
861
862 Ok((
863 GroupedCursorPage {
864 rows: page_rows,
865 next_cursor,
866 },
867 execution_trace,
868 ))
869 }
870
871 fn map_group_error(err: GroupError) -> InternalError {
873 match err {
874 GroupError::MemoryLimitExceeded { .. } | GroupError::DistinctBudgetExceeded { .. } => {
875 InternalError::executor_internal(err.to_string())
876 }
877 GroupError::Internal(inner) => inner,
878 }
879 }
880
881 fn global_distinct_field_aggregate_spec(
884 group_fields: &[crate::db::query::plan::FieldSlot],
885 aggregates: &[GroupAggregateSpec],
886 having: Option<&GroupHavingSpec>,
887 ) -> Result<Option<(AggregateKind, String)>, InternalError> {
888 if !group_fields.is_empty() {
889 return Ok(None);
890 }
891 if aggregates.is_empty() {
892 return Ok(None);
893 }
894 if aggregates
895 .iter()
896 .all(|aggregate| aggregate.target_field().is_none())
897 {
898 return Ok(None);
899 }
900 if having.is_some() {
901 return Err(InternalError::query_executor_invariant(
902 "global DISTINCT grouped aggregate shape does not support HAVING",
903 ));
904 }
905 if aggregates.len() != 1 {
906 return Err(InternalError::query_executor_invariant(
907 "global DISTINCT grouped aggregate shape requires exactly one aggregate terminal",
908 ));
909 }
910
911 let aggregate = &aggregates[0];
912 let Some(target_field) = aggregate.target_field() else {
913 return Err(InternalError::query_executor_invariant(
914 "global DISTINCT grouped aggregate shape requires field-target aggregate",
915 ));
916 };
917 if !aggregate.distinct() {
918 return Err(InternalError::query_executor_invariant(
919 "global DISTINCT grouped aggregate shape requires DISTINCT aggregate terminal",
920 ));
921 }
922 if !matches!(aggregate.kind(), AggregateKind::Count | AggregateKind::Sum) {
923 return Err(InternalError::query_executor_invariant(format!(
924 "unsupported global DISTINCT grouped aggregate kind: {:?}",
925 aggregate.kind()
926 )));
927 }
928
929 Ok(Some((aggregate.kind(), target_field.to_string())))
930 }
931
932 fn execute_global_distinct_field_aggregate(
935 plan: &AccessPlannedQuery<E::Key>,
936 ctx: &Context<'_, E>,
937 resolved: &mut ResolvedExecutionKeyStream,
938 compiled_predicate: Option<&crate::db::predicate::PredicateProgram>,
939 grouped_execution_context: &mut crate::db::executor::aggregate::ExecutionContext,
940 aggregate_spec: (AggregateKind, &str),
941 row_counters: (&mut usize, &mut usize),
942 ) -> Result<GroupedRow, InternalError> {
943 let (aggregate_kind, target_field) = aggregate_spec;
944 let (scanned_rows, filtered_rows) = row_counters;
945 let field_slot = if aggregate_kind.is_sum() {
946 Self::resolve_numeric_field_slot(target_field)?
947 } else {
948 Self::resolve_any_field_slot(target_field)?
949 };
950 let mut distinct_values = GroupKeySet::new();
951 let mut count = 0u32;
952 let mut sum = Decimal::ZERO;
953 let mut saw_sum_value = false;
954
955 grouped_execution_context
956 .record_implicit_single_group::<E>()
957 .map_err(Self::map_group_error)?;
958
959 while let Some(key) = resolved.key_stream.next_key()? {
960 let row = match plan.scalar_plan().consistency {
961 MissingRowPolicy::Error => ctx.read_strict(&key),
962 MissingRowPolicy::Ignore => ctx.read(&key),
963 };
964 let row = match row {
965 Ok(row) => row,
966 Err(err) if err.is_not_found() => continue,
967 Err(err) => return Err(err),
968 };
969 *scanned_rows = scanned_rows.saturating_add(1);
970 let (_, entity) = Context::<E>::deserialize_row((key, row))?;
971 if let Some(compiled_predicate) = compiled_predicate
972 && !compiled_predicate.eval(&entity)
973 {
974 continue;
975 }
976 *filtered_rows = filtered_rows.saturating_add(1);
977
978 let distinct_value = extract_orderable_field_value(&entity, target_field, field_slot)
979 .map_err(AggregateFieldValueError::into_internal_error)?;
980 let distinct_key = distinct_value
981 .canonical_key()
982 .map_err(KeyCanonicalError::into_internal_error)?;
983 if distinct_values.contains_key(&distinct_key) {
984 continue;
985 }
986
987 let attempted_total = grouped_execution_context
988 .budget()
989 .distinct_values()
990 .saturating_add(1);
991 if attempted_total
992 > grouped_execution_context
993 .config()
994 .max_distinct_values_total()
995 {
996 return Err(Self::map_group_error(GroupError::DistinctBudgetExceeded {
997 resource: "distinct_values_total",
998 attempted: attempted_total,
999 limit: grouped_execution_context
1000 .config()
1001 .max_distinct_values_total(),
1002 }));
1003 }
1004
1005 let attempted_per_group = u64::try_from(distinct_values.len())
1006 .unwrap_or(u64::MAX)
1007 .saturating_add(1);
1008 if attempted_per_group
1009 > grouped_execution_context
1010 .config()
1011 .max_distinct_values_per_group()
1012 {
1013 return Err(Self::map_group_error(GroupError::DistinctBudgetExceeded {
1014 resource: "distinct_values_per_group",
1015 attempted: attempted_per_group,
1016 limit: grouped_execution_context
1017 .config()
1018 .max_distinct_values_per_group(),
1019 }));
1020 }
1021
1022 let inserted = distinct_values.insert_key(distinct_key);
1023 debug_assert!(inserted, "new global distinct key must insert exactly once");
1024 grouped_execution_context
1025 .record_distinct_value()
1026 .map_err(Self::map_group_error)?;
1027
1028 if aggregate_kind.is_sum() {
1029 let numeric_value =
1030 extract_numeric_field_decimal(&entity, target_field, field_slot)
1031 .map_err(AggregateFieldValueError::into_internal_error)?;
1032 sum += numeric_value;
1033 saw_sum_value = true;
1034 } else {
1035 count = count.saturating_add(1);
1036 }
1037 }
1038
1039 let aggregate_value = if aggregate_kind.is_sum() {
1040 if saw_sum_value {
1041 Value::Decimal(sum)
1042 } else {
1043 Value::Null
1044 }
1045 } else {
1046 Value::Uint(u64::from(count))
1047 };
1048
1049 Ok(GroupedRow::new(Vec::new(), vec![aggregate_value]))
1050 }
1051
1052 fn page_global_distinct_grouped_row(
1054 row: GroupedRow,
1055 page: Option<&crate::db::query::plan::PageSpec>,
1056 ) -> Vec<GroupedRow> {
1057 let Some(page) = page else {
1058 return vec![row];
1059 };
1060 if page.offset > 0 || page.limit == Some(0) {
1061 return Vec::new();
1062 }
1063
1064 vec![row]
1065 }
1066
1067 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
1069 match output {
1070 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
1071 AggregateOutput::Sum(value) => value.map_or(Value::Null, Value::Decimal),
1072 AggregateOutput::Exists(value) => Value::Bool(*value),
1073 AggregateOutput::Min(value)
1074 | AggregateOutput::Max(value)
1075 | AggregateOutput::First(value)
1076 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
1077 }
1078 }
1079
1080 fn group_matches_having(
1082 having: &GroupHavingSpec,
1083 group_fields: &[crate::db::query::plan::FieldSlot],
1084 group_key_value: &Value,
1085 aggregate_values: &[Value],
1086 ) -> Result<bool, InternalError> {
1087 for (index, clause) in having.clauses().iter().enumerate() {
1088 let actual = match clause.symbol() {
1089 GroupHavingSymbol::GroupField(field_slot) => {
1090 let group_key_list = match group_key_value {
1091 Value::List(values) => values,
1092 value => {
1093 return Err(InternalError::query_executor_invariant(format!(
1094 "grouped HAVING requires list-shaped grouped keys, found {value:?}"
1095 )));
1096 }
1097 };
1098 let Some(group_field_offset) = group_fields
1099 .iter()
1100 .position(|group_field| group_field.index() == field_slot.index())
1101 else {
1102 return Err(InternalError::query_executor_invariant(format!(
1103 "grouped HAVING field is not in grouped key projection: field='{}'",
1104 field_slot.field()
1105 )));
1106 };
1107 group_key_list.get(group_field_offset).ok_or_else(|| {
1108 InternalError::query_executor_invariant(format!(
1109 "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
1110 group_key_list.len()
1111 ))
1112 })?
1113 }
1114 GroupHavingSymbol::AggregateIndex(aggregate_index) => {
1115 aggregate_values.get(*aggregate_index).ok_or_else(|| {
1116 InternalError::query_executor_invariant(format!(
1117 "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
1118 aggregate_values.len()
1119 ))
1120 })?
1121 }
1122 };
1123
1124 if !Self::having_compare_values(actual, clause.op(), clause.value())? {
1125 return Ok(false);
1126 }
1127 }
1128
1129 Ok(true)
1130 }
1131
1132 fn having_compare_values(
1134 actual: &Value,
1135 op: CompareOp,
1136 expected: &Value,
1137 ) -> Result<bool, InternalError> {
1138 let strict = CoercionSpec::default();
1139 let matches = match op {
1140 CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
1141 CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
1142 CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
1143 CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
1144 CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
1145 CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
1146 CompareOp::In
1147 | CompareOp::NotIn
1148 | CompareOp::Contains
1149 | CompareOp::StartsWith
1150 | CompareOp::EndsWith => {
1151 return Err(InternalError::query_executor_invariant(format!(
1152 "unsupported grouped HAVING operator reached executor: {op:?}"
1153 )));
1154 }
1155 };
1156
1157 Ok(matches)
1158 }
1159
1160 fn finalize_path_outcome(
1162 execution_trace: &mut Option<ExecutionTrace>,
1163 optimization: Option<ExecutionOptimization>,
1164 rows_scanned: usize,
1165 rows_returned: usize,
1166 index_predicate_applied: bool,
1167 index_predicate_keys_rejected: u64,
1168 distinct_keys_deduped: u64,
1169 ) {
1170 record_rows_scanned::<E>(rows_scanned);
1171 if let Some(execution_trace) = execution_trace.as_mut() {
1172 execution_trace.set_path_outcome(
1173 optimization,
1174 rows_scanned,
1175 rows_returned,
1176 index_predicate_applied,
1177 index_predicate_keys_rejected,
1178 distinct_keys_deduped,
1179 );
1180 debug_assert_eq!(
1181 execution_trace.keys_scanned,
1182 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
1183 "execution trace keys_scanned must match rows_scanned metrics input",
1184 );
1185 }
1186 }
1187
1188 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
1190 plan: &AccessPlannedQuery<E::Key>,
1191 cursor_boundary: Option<&CursorBoundary>,
1192 ) -> Result<(), InternalError> {
1193 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
1194 return Ok(());
1195 }
1196 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
1197
1198 Ok(())
1199 }
1200}