1mod execute;
7mod fast_stream;
8mod index_range_limit;
9mod page;
10mod pk_stream;
11mod secondary_index;
12mod terminal;
13mod trace;
14
15pub(in crate::db::executor) use self::execute::{
16 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
17};
18
19use self::trace::{access_path_variant, execution_order_direction};
20use crate::{
21 db::{
22 Context, Db, GroupedRow,
23 access::AccessPlan,
24 contracts::canonical_value_compare,
25 cursor::{
26 ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
27 PlannedCursor, decode_pk_cursor_boundary,
28 },
29 data::DataKey,
30 direction::Direction,
31 executor::{
32 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
33 KeyOrderComparator, OrderedKeyStreamBox,
34 aggregate::field::{
35 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
36 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
37 },
38 aggregate::{AggregateOutput, FoldControl, GroupError},
39 group::{
40 CanonicalKey, grouped_budget_observability,
41 grouped_execution_context_from_planner_config,
42 },
43 plan_metrics::{
44 GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
45 record_rows_scanned,
46 },
47 range_token_anchor_key, range_token_from_cursor_anchor,
48 route::aggregate_materialized_fold_direction,
49 validate_executor_plan,
50 },
51 index::IndexCompilePolicy,
52 predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
53 query::plan::{
54 AccessPlannedQuery, GroupHavingSpec, GroupHavingSymbol, LogicalPlan, OrderDirection,
55 grouped_executor_handoff,
56 },
57 response::Response,
58 },
59 error::InternalError,
60 obs::sink::{ExecKind, Span},
61 traits::{EntityKind, EntityValue},
62 value::Value,
63};
64use std::{cmp::Ordering, marker::PhantomData};
65
66#[derive(Clone, Debug, Eq, PartialEq)]
72pub(in crate::db) enum PageCursor {
73 Scalar(ContinuationToken),
74 Grouped(GroupedContinuationToken),
75}
76
77impl PageCursor {
78 #[must_use]
80 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
81 match self {
82 Self::Scalar(token) => Some(token),
83 Self::Grouped(_) => None,
84 }
85 }
86
87 #[must_use]
89 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
90 match self {
91 Self::Scalar(_) => None,
92 Self::Grouped(token) => Some(token),
93 }
94 }
95}
96
97impl From<ContinuationToken> for PageCursor {
98 fn from(value: ContinuationToken) -> Self {
99 Self::Scalar(value)
100 }
101}
102
103impl From<GroupedContinuationToken> for PageCursor {
104 fn from(value: GroupedContinuationToken) -> Self {
105 Self::Grouped(value)
106 }
107}
108
109#[derive(Debug)]
117pub(crate) struct CursorPage<E: EntityKind> {
118 pub(crate) items: Response<E>,
119
120 pub(crate) next_cursor: Option<PageCursor>,
121}
122
123#[derive(Debug)]
129pub(in crate::db) struct GroupedCursorPage {
130 pub(in crate::db) rows: Vec<GroupedRow>,
131 pub(in crate::db) next_cursor: Option<PageCursor>,
132}
133
134#[derive(Clone, Copy, Debug, Eq, PartialEq)]
141pub enum ExecutionAccessPathVariant {
142 ByKey,
143 ByKeys,
144 KeyRange,
145 IndexPrefix,
146 IndexRange,
147 FullScan,
148 Union,
149 Intersection,
150}
151
152#[derive(Clone, Copy, Debug, Eq, PartialEq)]
159pub enum ExecutionOptimization {
160 PrimaryKey,
161 SecondaryOrderPushdown,
162 IndexRangeLimitPushdown,
163}
164
165#[derive(Clone, Copy, Debug, Eq, PartialEq)]
173pub struct ExecutionTrace {
174 pub access_path_variant: ExecutionAccessPathVariant,
175 pub direction: OrderDirection,
176 pub optimization: Option<ExecutionOptimization>,
177 pub keys_scanned: u64,
178 pub rows_returned: u64,
179 pub continuation_applied: bool,
180 pub index_predicate_applied: bool,
181 pub index_predicate_keys_rejected: u64,
182 pub distinct_keys_deduped: u64,
183}
184
185impl ExecutionTrace {
186 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
187 Self {
188 access_path_variant: access_path_variant(access),
189 direction: execution_order_direction(direction),
190 optimization: None,
191 keys_scanned: 0,
192 rows_returned: 0,
193 continuation_applied,
194 index_predicate_applied: false,
195 index_predicate_keys_rejected: 0,
196 distinct_keys_deduped: 0,
197 }
198 }
199
200 fn set_path_outcome(
201 &mut self,
202 optimization: Option<ExecutionOptimization>,
203 keys_scanned: usize,
204 rows_returned: usize,
205 index_predicate_applied: bool,
206 index_predicate_keys_rejected: u64,
207 distinct_keys_deduped: u64,
208 ) {
209 self.optimization = optimization;
210 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
211 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
212 self.index_predicate_applied = index_predicate_applied;
213 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
214 self.distinct_keys_deduped = distinct_keys_deduped;
215 }
216}
217
218pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
220 direction: Direction,
221) -> KeyOrderComparator {
222 KeyOrderComparator::from_direction(direction)
223}
224
225pub(in crate::db::executor) struct FastPathKeyResult {
233 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
234 pub(in crate::db::executor) rows_scanned: usize,
235 pub(in crate::db::executor) optimization: ExecutionOptimization,
236}
237
238#[derive(Clone)]
246pub(crate) struct LoadExecutor<E: EntityKind> {
247 db: Db<E::Canister>,
248 debug: bool,
249 _marker: PhantomData<E>,
250}
251
252impl<E> LoadExecutor<E>
253where
254 E: EntityKind + EntityValue,
255{
256 #[must_use]
258 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
259 Self {
260 db,
261 debug,
262 _marker: PhantomData,
263 }
264 }
265
266 pub(in crate::db::executor) fn recovered_context(
268 &self,
269 ) -> Result<crate::db::Context<'_, E>, InternalError> {
270 self.db.recovered_context::<E>()
271 }
272
273 pub(in crate::db::executor) fn resolve_orderable_field_slot(
276 target_field: &str,
277 ) -> Result<FieldSlot, InternalError> {
278 resolve_orderable_aggregate_target_slot::<E>(target_field)
279 .map_err(AggregateFieldValueError::into_internal_error)
280 }
281
282 pub(in crate::db::executor) fn resolve_any_field_slot(
285 target_field: &str,
286 ) -> Result<FieldSlot, InternalError> {
287 resolve_any_aggregate_target_slot::<E>(target_field)
288 .map_err(AggregateFieldValueError::into_internal_error)
289 }
290
291 pub(in crate::db::executor) fn resolve_numeric_field_slot(
294 target_field: &str,
295 ) -> Result<FieldSlot, InternalError> {
296 resolve_numeric_aggregate_target_slot::<E>(target_field)
297 .map_err(AggregateFieldValueError::into_internal_error)
298 }
299
300 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
301 self.execute_paged_with_cursor(plan, PlannedCursor::none())
302 .map(|page| page.items)
303 }
304
305 pub(in crate::db) fn execute_paged_with_cursor(
306 &self,
307 plan: ExecutablePlan<E>,
308 cursor: impl Into<PlannedCursor>,
309 ) -> Result<CursorPage<E>, InternalError> {
310 self.execute_paged_with_cursor_traced(plan, cursor)
311 .map(|(page, _)| page)
312 }
313
314 pub(in crate::db) fn execute_paged_with_cursor_traced(
315 &self,
316 plan: ExecutablePlan<E>,
317 cursor: impl Into<PlannedCursor>,
318 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
319 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
320 return Err(InternalError::query_executor_invariant(
321 "grouped plans require execute_grouped pagination entrypoints",
322 ));
323 }
324
325 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
326 let cursor_boundary = cursor.boundary().cloned();
327 let index_range_token = cursor
328 .index_range_anchor()
329 .map(range_token_from_cursor_anchor);
330
331 if !plan.mode().is_load() {
332 return Err(InternalError::query_executor_invariant(
333 "load executor requires load plans",
334 ));
335 }
336
337 let continuation_signature = plan.continuation_signature();
338 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
339 let index_range_specs = plan.index_range_specs()?.to_vec();
340 let route_plan = Self::build_execution_route_plan_for_load(
341 plan.as_inner(),
342 cursor_boundary.as_ref(),
343 index_range_token.as_ref(),
344 None,
345 )?;
346 let continuation_applied = !matches!(
347 route_plan.continuation_mode(),
348 crate::db::executor::route::ContinuationMode::Initial
349 );
350 let direction = route_plan.direction();
351 debug_assert_eq!(
352 route_plan.window().effective_offset,
353 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
354 "route window effective offset must match logical plan offset semantics",
355 );
356 let mut execution_trace = self
357 .debug
358 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
359 let plan = plan.into_inner();
360 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
361
362 let result = (|| {
363 let mut span = Span::<E>::new(ExecKind::Load);
364
365 validate_executor_plan::<E>(&plan)?;
366 let ctx = self.db.recovered_context::<E>()?;
367 let execution_inputs = ExecutionInputs {
368 ctx: &ctx,
369 plan: &plan,
370 stream_bindings: AccessStreamBindings {
371 index_prefix_specs: index_prefix_specs.as_slice(),
372 index_range_specs: index_range_specs.as_slice(),
373 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
374 direction,
375 },
376 execution_preparation: &execution_preparation,
377 };
378
379 record_plan_metrics(&plan.access);
380 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
383 &execution_inputs,
384 &route_plan,
385 cursor_boundary.as_ref(),
386 continuation_signature,
387 IndexCompilePolicy::ConservativeSubset,
388 )?;
389 let page = materialized.page;
390 let rows_scanned = materialized.rows_scanned;
391 let post_access_rows = materialized.post_access_rows;
392 let optimization = materialized.optimization;
393 let index_predicate_applied = materialized.index_predicate_applied;
394 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
395 let distinct_keys_deduped = materialized.distinct_keys_deduped;
396
397 Ok(Self::finalize_execution(
398 page,
399 optimization,
400 rows_scanned,
401 post_access_rows,
402 index_predicate_applied,
403 index_predicate_keys_rejected,
404 distinct_keys_deduped,
405 &mut span,
406 &mut execution_trace,
407 ))
408 })();
409
410 result.map(|page| (page, execution_trace))
411 }
412
413 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
414 &self,
415 plan: ExecutablePlan<E>,
416 cursor: impl Into<GroupedPlannedCursor>,
417 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
418 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
419 return Err(InternalError::query_executor_invariant(
420 "grouped execution requires grouped logical plans",
421 ));
422 }
423
424 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
425
426 self.execute_grouped_path(plan, cursor)
427 }
428
429 #[expect(clippy::too_many_lines)]
431 fn execute_grouped_path(
432 &self,
433 plan: ExecutablePlan<E>,
434 cursor: GroupedPlannedCursor,
435 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
436 validate_executor_plan::<E>(plan.as_inner())?;
437 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
438 let grouped_execution = grouped_handoff.execution();
439 let group_fields = grouped_handoff.group_fields().to_vec();
440 let grouped_having = grouped_handoff.having().cloned();
441 let grouped_route_plan =
442 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
443 let grouped_route_observability =
444 grouped_route_plan.grouped_observability().ok_or_else(|| {
445 InternalError::query_executor_invariant(
446 "grouped route planning must emit grouped observability payload",
447 )
448 })?;
449 let direction = grouped_route_plan.direction();
450 let continuation_applied = !cursor.is_empty();
451 let mut execution_trace = self
452 .debug
453 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
454 let continuation_signature = plan.continuation_signature();
455 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
456 let index_range_specs = plan.index_range_specs()?.to_vec();
457
458 let mut grouped_execution_context =
459 grouped_execution_context_from_planner_config(Some(grouped_execution));
460 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
461 debug_assert!(
462 grouped_budget.max_groups() >= grouped_budget.groups()
463 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
464 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
465 "grouped budget observability invariants must hold at grouped route entry"
466 );
467
468 let grouped_route_outcome = grouped_route_observability.outcome();
470 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
471 let grouped_route_eligible = grouped_route_observability.eligible();
472 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
473 let grouped_plan_metrics_strategy =
474 match grouped_route_observability.grouped_execution_strategy() {
475 crate::db::executor::route::GroupedExecutionStrategy::HashGroup => {
476 GroupedPlanMetricsStrategy::HashMaterialized
477 }
478 crate::db::executor::route::GroupedExecutionStrategy::OrderedGroup => {
479 GroupedPlanMetricsStrategy::OrderedStreaming
480 }
481 };
482 debug_assert!(
483 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
484 "grouped route eligibility and rejection reason must stay aligned",
485 );
486 debug_assert!(
487 grouped_route_outcome
488 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
489 || grouped_route_rejection_reason.is_some(),
490 "grouped rejected outcomes must carry a rejection reason",
491 );
492 debug_assert!(
493 matches!(
494 grouped_route_execution_mode,
495 crate::db::executor::route::ExecutionMode::Materialized
496 ),
497 "grouped execution route must remain blocking/materialized",
498 );
499 let mut grouped_engines = grouped_handoff
500 .aggregates()
501 .iter()
502 .map(|aggregate| {
503 if aggregate.target_field().is_some() {
504 return Err(InternalError::query_executor_invariant(format!(
505 "grouped field-target aggregate reached executor after planning: {:?}",
506 aggregate.kind()
507 )));
508 }
509
510 Ok(grouped_execution_context.create_grouped_engine::<E>(
511 aggregate.kind(),
512 aggregate_materialized_fold_direction(aggregate.kind()),
513 ))
514 })
515 .collect::<Result<Vec<_>, _>>()?;
516 let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
517 let plan = plan.into_inner();
518 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
519
520 let mut span = Span::<E>::new(ExecKind::Load);
521 let ctx = self.db.recovered_context::<E>()?;
522 let execution_inputs = ExecutionInputs {
523 ctx: &ctx,
524 plan: &plan,
525 stream_bindings: AccessStreamBindings {
526 index_prefix_specs: index_prefix_specs.as_slice(),
527 index_range_specs: index_range_specs.as_slice(),
528 index_range_anchor: None,
529 direction,
530 },
531 execution_preparation: &execution_preparation,
532 };
533 record_grouped_plan_metrics(&plan.access, grouped_plan_metrics_strategy);
534 let mut resolved = Self::resolve_execution_key_stream_without_distinct(
535 &execution_inputs,
536 &grouped_route_plan,
537 IndexCompilePolicy::ConservativeSubset,
538 )?;
539 let mut scanned_rows = 0usize;
540 let mut filtered_rows = 0usize;
541 let compiled_predicate = execution_preparation.compiled_predicate();
542
543 while let Some(key) = resolved.key_stream.next_key()? {
545 let row = match plan.scalar_plan().consistency {
546 MissingRowPolicy::Error => ctx.read_strict(&key),
547 MissingRowPolicy::Ignore => ctx.read(&key),
548 };
549 let row = match row {
550 Ok(row) => row,
551 Err(err) if err.is_not_found() => continue,
552 Err(err) => return Err(err),
553 };
554 scanned_rows = scanned_rows.saturating_add(1);
555 let (id, entity) = Context::<E>::deserialize_row((key, row))?;
556 if let Some(compiled_predicate) = compiled_predicate
557 && !compiled_predicate.eval(&entity)
558 {
559 continue;
560 }
561 filtered_rows = filtered_rows.saturating_add(1);
562
563 let group_values = group_fields
564 .iter()
565 .map(|field| {
566 entity.get_value_by_index(field.index()).ok_or_else(|| {
567 InternalError::query_executor_invariant(format!(
568 "grouped field slot missing on entity: index={}",
569 field.index()
570 ))
571 })
572 })
573 .collect::<Result<Vec<_>, _>>()?;
574 let group_key = Value::List(group_values)
575 .canonical_key()
576 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
577 let canonical_group_value = group_key.canonical_value().clone();
578 let data_key = DataKey::try_new::<E>(id.key())?;
579
580 for (index, engine) in grouped_engines.iter_mut().enumerate() {
581 if short_circuit_keys[index].iter().any(|done| {
582 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
583 }) {
584 continue;
585 }
586
587 let fold_control = engine
588 .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
589 .map_err(Self::map_group_error)?;
590 if matches!(fold_control, FoldControl::Break) {
591 short_circuit_keys[index].push(canonical_group_value.clone());
592 }
593 }
594 }
595
596 let aggregate_count = grouped_engines.len();
601 if aggregate_count == 0 {
602 return Err(InternalError::query_executor_invariant(
603 "grouped execution requires at least one aggregate terminal",
604 ));
605 }
606 let mut finalized_iters = grouped_engines
607 .into_iter()
608 .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
609 .collect::<Result<Vec<_>, _>>()?;
610 let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
611 InternalError::query_executor_invariant("missing grouped primary iterator")
612 })?;
613
614 let initial_offset = plan
616 .scalar_plan()
617 .page
618 .as_ref()
619 .map_or(0, |page| page.offset);
620 let resume_initial_offset = if cursor.is_empty() {
621 initial_offset
622 } else {
623 cursor.initial_offset()
624 };
625 let resume_boundary = cursor
626 .last_group_key()
627 .map(|last_group_key| Value::List(last_group_key.to_vec()));
628 let apply_initial_offset = cursor.is_empty();
629 let limit = plan
630 .scalar_plan()
631 .page
632 .as_ref()
633 .and_then(|page| page.limit)
634 .and_then(|limit| usize::try_from(limit).ok());
635 let initial_offset_for_page = if apply_initial_offset {
636 usize::try_from(initial_offset).unwrap_or(usize::MAX)
637 } else {
638 0
639 };
640 let selection_bound = limit.and_then(|limit| {
641 limit
642 .checked_add(initial_offset_for_page)
643 .and_then(|count| count.checked_add(1))
644 });
645 let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
646 if limit.is_none_or(|limit| limit != 0) {
647 for primary_output in primary_iter.by_ref() {
648 let group_key_value = primary_output.group_key().canonical_value().clone();
649 let mut aggregate_values = Vec::with_capacity(aggregate_count);
650 aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
651 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
652 let sibling_output = sibling_iter.next().ok_or_else(|| {
653 InternalError::query_executor_invariant(format!(
654 "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
655 ))
656 })?;
657 let sibling_group_key = sibling_output.group_key().canonical_value();
658 if canonical_value_compare(sibling_group_key, &group_key_value)
659 != Ordering::Equal
660 {
661 return Err(InternalError::query_executor_invariant(format!(
662 "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
663 )));
664 }
665 aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
666 }
667 debug_assert_eq!(
668 aggregate_values.len(),
669 aggregate_count,
670 "grouped aggregate value alignment must preserve declared aggregate count",
671 );
672 if let Some(grouped_having) = grouped_having.as_ref()
673 && !Self::group_matches_having(
674 grouped_having,
675 group_fields.as_slice(),
676 &group_key_value,
677 aggregate_values.as_slice(),
678 )?
679 {
680 continue;
681 }
682
683 if let Some(resume_boundary) = resume_boundary.as_ref()
684 && canonical_value_compare(&group_key_value, resume_boundary)
685 != Ordering::Greater
686 {
687 continue;
688 }
689
690 if let Some(selection_bound) = selection_bound {
693 match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
694 canonical_value_compare(existing_key, &group_key_value)
695 }) {
696 Ok(_) => {
697 return Err(InternalError::query_executor_invariant(format!(
698 "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
699 )));
700 }
701 Err(insert_index) => {
702 grouped_candidate_rows
703 .insert(insert_index, (group_key_value, aggregate_values));
704 if grouped_candidate_rows.len() > selection_bound {
705 let _ = grouped_candidate_rows.pop();
706 }
707 }
708 }
709 } else {
710 grouped_candidate_rows.push((group_key_value, aggregate_values));
711 }
712 }
713 for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
714 if sibling_iter.next().is_some() {
715 return Err(InternalError::query_executor_invariant(format!(
716 "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
717 )));
718 }
719 }
720 if selection_bound.is_none() {
721 grouped_candidate_rows
722 .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
723 }
724 }
725
726 let mut page_rows = Vec::<GroupedRow>::new();
727 let mut last_emitted_group_key: Option<Vec<Value>> = None;
728 let mut has_more = false;
729 let mut groups_skipped_for_offset = 0usize;
730 for (group_key_value, aggregate_values) in grouped_candidate_rows {
731 if groups_skipped_for_offset < initial_offset_for_page {
732 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
733 continue;
734 }
735 if let Some(limit) = limit
736 && page_rows.len() >= limit
737 {
738 has_more = true;
739 break;
740 }
741
742 let emitted_group_key = match group_key_value {
743 Value::List(values) => values,
744 value => {
745 return Err(InternalError::query_executor_invariant(format!(
746 "grouped canonical key must be Value::List, found {value:?}"
747 )));
748 }
749 };
750 last_emitted_group_key = Some(emitted_group_key.clone());
751 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
752 }
753
754 let next_cursor = if has_more {
755 last_emitted_group_key.map(|last_group_key| {
756 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
757 continuation_signature,
758 last_group_key,
759 Direction::Asc,
760 resume_initial_offset,
761 ))
762 })
763 } else {
764 None
765 };
766 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
767 let optimization = resolved.optimization;
768 let index_predicate_applied = resolved.index_predicate_applied;
769 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
770 let distinct_keys_deduped = resolved
771 .distinct_keys_deduped_counter
772 .as_ref()
773 .map_or(0, |counter| counter.get());
774 let rows_returned = page_rows.len();
775
776 Self::finalize_path_outcome(
777 &mut execution_trace,
778 optimization,
779 rows_scanned,
780 rows_returned,
781 index_predicate_applied,
782 index_predicate_keys_rejected,
783 distinct_keys_deduped,
784 );
785 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
786 debug_assert!(
787 filtered_rows >= rows_returned,
788 "grouped pagination must return at most filtered row cardinality",
789 );
790
791 Ok((
792 GroupedCursorPage {
793 rows: page_rows,
794 next_cursor,
795 },
796 execution_trace,
797 ))
798 }
799
800 fn map_group_error(err: GroupError) -> InternalError {
802 match err {
803 GroupError::MemoryLimitExceeded { .. } => {
804 InternalError::executor_internal(err.to_string())
805 }
806 GroupError::Internal(inner) => inner,
807 }
808 }
809
810 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
812 match output {
813 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
814 AggregateOutput::Exists(value) => Value::Bool(*value),
815 AggregateOutput::Min(value)
816 | AggregateOutput::Max(value)
817 | AggregateOutput::First(value)
818 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
819 }
820 }
821
822 fn group_matches_having(
824 having: &GroupHavingSpec,
825 group_fields: &[crate::db::query::plan::FieldSlot],
826 group_key_value: &Value,
827 aggregate_values: &[Value],
828 ) -> Result<bool, InternalError> {
829 for (index, clause) in having.clauses().iter().enumerate() {
830 let actual = match clause.symbol() {
831 GroupHavingSymbol::GroupField(field_slot) => {
832 let group_key_list = match group_key_value {
833 Value::List(values) => values,
834 value => {
835 return Err(InternalError::query_executor_invariant(format!(
836 "grouped HAVING requires list-shaped grouped keys, found {value:?}"
837 )));
838 }
839 };
840 let Some(group_field_offset) = group_fields
841 .iter()
842 .position(|group_field| group_field.index() == field_slot.index())
843 else {
844 return Err(InternalError::query_executor_invariant(format!(
845 "grouped HAVING field is not in grouped key projection: field='{}'",
846 field_slot.field()
847 )));
848 };
849 group_key_list.get(group_field_offset).ok_or_else(|| {
850 InternalError::query_executor_invariant(format!(
851 "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
852 group_key_list.len()
853 ))
854 })?
855 }
856 GroupHavingSymbol::AggregateIndex(aggregate_index) => {
857 aggregate_values.get(*aggregate_index).ok_or_else(|| {
858 InternalError::query_executor_invariant(format!(
859 "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
860 aggregate_values.len()
861 ))
862 })?
863 }
864 };
865
866 if !Self::having_compare_values(actual, clause.op(), clause.value())? {
867 return Ok(false);
868 }
869 }
870
871 Ok(true)
872 }
873
874 fn having_compare_values(
876 actual: &Value,
877 op: CompareOp,
878 expected: &Value,
879 ) -> Result<bool, InternalError> {
880 let strict = CoercionSpec::default();
881 let matches = match op {
882 CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
883 CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
884 CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
885 CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
886 CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
887 CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
888 CompareOp::In
889 | CompareOp::NotIn
890 | CompareOp::Contains
891 | CompareOp::StartsWith
892 | CompareOp::EndsWith => {
893 return Err(InternalError::query_executor_invariant(format!(
894 "unsupported grouped HAVING operator reached executor: {op:?}"
895 )));
896 }
897 };
898
899 Ok(matches)
900 }
901
902 fn finalize_path_outcome(
904 execution_trace: &mut Option<ExecutionTrace>,
905 optimization: Option<ExecutionOptimization>,
906 rows_scanned: usize,
907 rows_returned: usize,
908 index_predicate_applied: bool,
909 index_predicate_keys_rejected: u64,
910 distinct_keys_deduped: u64,
911 ) {
912 record_rows_scanned::<E>(rows_scanned);
913 if let Some(execution_trace) = execution_trace.as_mut() {
914 execution_trace.set_path_outcome(
915 optimization,
916 rows_scanned,
917 rows_returned,
918 index_predicate_applied,
919 index_predicate_keys_rejected,
920 distinct_keys_deduped,
921 );
922 debug_assert_eq!(
923 execution_trace.keys_scanned,
924 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
925 "execution trace keys_scanned must match rows_scanned metrics input",
926 );
927 }
928 }
929
930 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
932 plan: &AccessPlannedQuery<E::Key>,
933 cursor_boundary: Option<&CursorBoundary>,
934 ) -> Result<(), InternalError> {
935 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
936 return Ok(());
937 }
938 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
939
940 Ok(())
941 }
942}