1mod execute;
2mod fast_stream;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod secondary_index;
7mod terminal;
8mod trace;
9
10pub(in crate::db::executor) use self::execute::{
11 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
12};
13
14use self::trace::{access_path_variant, execution_order_direction};
15use crate::{
16 db::{
17 Context, Db, GroupedRow,
18 access::AccessPlan,
19 contracts::canonical_value_compare,
20 cursor::{
21 ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
22 PlannedCursor, decode_pk_cursor_boundary,
23 },
24 data::DataKey,
25 direction::Direction,
26 executor::{
27 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
28 KeyOrderComparator, OrderedKeyStreamBox,
29 aggregate::field::{
30 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
31 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
32 },
33 aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
34 group::{
35 CanonicalKey, grouped_budget_observability,
36 grouped_execution_context_from_planner_config,
37 },
38 plan_metrics::{record_plan_metrics, record_rows_scanned},
39 range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
40 },
41 index::IndexCompilePolicy,
42 policy,
43 query::plan::{AccessPlannedQuery, LogicalPlan, OrderDirection, grouped_executor_handoff},
44 response::Response,
45 },
46 error::InternalError,
47 obs::sink::{ExecKind, Span},
48 traits::{EntityKind, EntityValue},
49 value::Value,
50};
51use std::{cmp::Ordering, marker::PhantomData, ops::Deref};
52
53#[derive(Clone, Debug, Eq, PartialEq)]
59pub(in crate::db) enum PageCursor {
60 Scalar(ContinuationToken),
61 Grouped(GroupedContinuationToken),
62}
63
64impl PageCursor {
65 #[must_use]
66 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
67 match self {
68 Self::Scalar(token) => Some(token),
69 Self::Grouped(_) => None,
70 }
71 }
72
73 #[must_use]
74 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
75 match self {
76 Self::Scalar(_) => None,
77 Self::Grouped(token) => Some(token),
78 }
79 }
80
81 fn scalar_or_panic(&self) -> &ContinuationToken {
83 match self {
84 Self::Scalar(token) => token,
85 Self::Grouped(_) => {
86 panic!("grouped continuation cursor cannot be accessed as scalar token")
87 }
88 }
89 }
90}
91
92impl Deref for PageCursor {
93 type Target = ContinuationToken;
94
95 fn deref(&self) -> &Self::Target {
96 self.scalar_or_panic()
97 }
98}
99
100impl From<ContinuationToken> for PageCursor {
101 fn from(value: ContinuationToken) -> Self {
102 Self::Scalar(value)
103 }
104}
105
106impl From<GroupedContinuationToken> for PageCursor {
107 fn from(value: GroupedContinuationToken) -> Self {
108 Self::Grouped(value)
109 }
110}
111
112#[derive(Debug)]
120pub(crate) struct CursorPage<E: EntityKind> {
121 pub(crate) items: Response<E>,
122
123 pub(crate) next_cursor: Option<PageCursor>,
124}
125
126#[derive(Debug)]
132pub(in crate::db) struct GroupedCursorPage {
133 pub(in crate::db) rows: Vec<GroupedRow>,
134 pub(in crate::db) next_cursor: Option<PageCursor>,
135}
136
137#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub enum ExecutionAccessPathVariant {
145 ByKey,
146 ByKeys,
147 KeyRange,
148 IndexPrefix,
149 IndexRange,
150 FullScan,
151 Union,
152 Intersection,
153}
154
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
162pub enum ExecutionOptimization {
163 PrimaryKey,
164 SecondaryOrderPushdown,
165 IndexRangeLimitPushdown,
166}
167
168#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub struct ExecutionTrace {
177 pub access_path_variant: ExecutionAccessPathVariant,
178 pub direction: OrderDirection,
179 pub optimization: Option<ExecutionOptimization>,
180 pub keys_scanned: u64,
181 pub rows_returned: u64,
182 pub continuation_applied: bool,
183 pub index_predicate_applied: bool,
184 pub index_predicate_keys_rejected: u64,
185 pub distinct_keys_deduped: u64,
186}
187
188impl ExecutionTrace {
189 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
190 Self {
191 access_path_variant: access_path_variant(access),
192 direction: execution_order_direction(direction),
193 optimization: None,
194 keys_scanned: 0,
195 rows_returned: 0,
196 continuation_applied,
197 index_predicate_applied: false,
198 index_predicate_keys_rejected: 0,
199 distinct_keys_deduped: 0,
200 }
201 }
202
203 fn set_path_outcome(
204 &mut self,
205 optimization: Option<ExecutionOptimization>,
206 keys_scanned: usize,
207 rows_returned: usize,
208 index_predicate_applied: bool,
209 index_predicate_keys_rejected: u64,
210 distinct_keys_deduped: u64,
211 ) {
212 self.optimization = optimization;
213 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
214 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
215 self.index_predicate_applied = index_predicate_applied;
216 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
217 self.distinct_keys_deduped = distinct_keys_deduped;
218 }
219}
220
221pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
222 direction: Direction,
223) -> KeyOrderComparator {
224 KeyOrderComparator::from_direction(direction)
225}
226
227pub(in crate::db::executor) struct FastPathKeyResult {
235 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
236 pub(in crate::db::executor) rows_scanned: usize,
237 pub(in crate::db::executor) optimization: ExecutionOptimization,
238}
239
240#[derive(Clone)]
248pub(crate) struct LoadExecutor<E: EntityKind> {
249 db: Db<E::Canister>,
250 debug: bool,
251 _marker: PhantomData<E>,
252}
253
254impl<E> LoadExecutor<E>
255where
256 E: EntityKind + EntityValue,
257{
258 #[must_use]
259 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
260 Self {
261 db,
262 debug,
263 _marker: PhantomData,
264 }
265 }
266
267 pub(in crate::db::executor) fn recovered_context(
269 &self,
270 ) -> Result<crate::db::Context<'_, E>, InternalError> {
271 self.db.recovered_context::<E>()
272 }
273
274 pub(in crate::db::executor) fn resolve_orderable_field_slot(
277 target_field: &str,
278 ) -> Result<FieldSlot, InternalError> {
279 resolve_orderable_aggregate_target_slot::<E>(target_field)
280 .map_err(AggregateFieldValueError::into_internal_error)
281 }
282
283 pub(in crate::db::executor) fn resolve_any_field_slot(
286 target_field: &str,
287 ) -> Result<FieldSlot, InternalError> {
288 resolve_any_aggregate_target_slot::<E>(target_field)
289 .map_err(AggregateFieldValueError::into_internal_error)
290 }
291
292 pub(in crate::db::executor) fn resolve_numeric_field_slot(
295 target_field: &str,
296 ) -> Result<FieldSlot, InternalError> {
297 resolve_numeric_aggregate_target_slot::<E>(target_field)
298 .map_err(AggregateFieldValueError::into_internal_error)
299 }
300
301 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
302 self.execute_paged_with_cursor(plan, PlannedCursor::none())
303 .map(|page| page.items)
304 }
305
306 pub(in crate::db) fn execute_paged_with_cursor(
307 &self,
308 plan: ExecutablePlan<E>,
309 cursor: impl Into<PlannedCursor>,
310 ) -> Result<CursorPage<E>, InternalError> {
311 self.execute_paged_with_cursor_traced(plan, cursor)
312 .map(|(page, _)| page)
313 }
314
315 pub(in crate::db) fn execute_paged_with_cursor_traced(
316 &self,
317 plan: ExecutablePlan<E>,
318 cursor: impl Into<PlannedCursor>,
319 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
320 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
321 return Err(InternalError::query_executor_invariant(
322 "grouped plans require execute_grouped pagination entrypoints",
323 ));
324 }
325
326 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
327 let cursor_boundary = cursor.boundary().cloned();
328 let index_range_token = cursor
329 .index_range_anchor()
330 .map(range_token_from_cursor_anchor);
331
332 if !plan.mode().is_load() {
333 return Err(InternalError::query_executor_invariant(
334 "load executor requires load plans",
335 ));
336 }
337 debug_assert!(
338 policy::validate_plan_shape(&plan.as_inner().logical).is_ok(),
339 "load executor received a plan shape that bypassed planning validation",
340 );
341
342 let continuation_signature = plan.continuation_signature();
343 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
344 let index_range_specs = plan.index_range_specs()?.to_vec();
345 let route_plan = Self::build_execution_route_plan_for_load(
346 plan.as_inner(),
347 cursor_boundary.as_ref(),
348 index_range_token.as_ref(),
349 None,
350 )?;
351 let continuation_applied = !matches!(
352 route_plan.continuation_mode(),
353 crate::db::executor::route::ContinuationMode::Initial
354 );
355 let direction = route_plan.direction();
356 debug_assert_eq!(
357 route_plan.window().effective_offset,
358 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
359 "route window effective offset must match logical plan offset semantics",
360 );
361 let mut execution_trace = self
362 .debug
363 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
364 let plan = plan.into_inner();
365 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
366
367 let result = (|| {
368 let mut span = Span::<E>::new(ExecKind::Load);
369
370 validate_executor_plan::<E>(&plan)?;
371 let ctx = self.db.recovered_context::<E>()?;
372 let execution_inputs = ExecutionInputs {
373 ctx: &ctx,
374 plan: &plan,
375 stream_bindings: AccessStreamBindings {
376 index_prefix_specs: index_prefix_specs.as_slice(),
377 index_range_specs: index_range_specs.as_slice(),
378 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
379 direction,
380 },
381 execution_preparation: &execution_preparation,
382 };
383
384 record_plan_metrics(&plan.access);
385 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
388 &execution_inputs,
389 &route_plan,
390 cursor_boundary.as_ref(),
391 continuation_signature,
392 IndexCompilePolicy::ConservativeSubset,
393 )?;
394 let page = materialized.page;
395 let rows_scanned = materialized.rows_scanned;
396 let post_access_rows = materialized.post_access_rows;
397 let optimization = materialized.optimization;
398 let index_predicate_applied = materialized.index_predicate_applied;
399 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
400 let distinct_keys_deduped = materialized.distinct_keys_deduped;
401
402 Ok(Self::finalize_execution(
403 page,
404 optimization,
405 rows_scanned,
406 post_access_rows,
407 index_predicate_applied,
408 index_predicate_keys_rejected,
409 distinct_keys_deduped,
410 &mut span,
411 &mut execution_trace,
412 ))
413 })();
414
415 result.map(|page| (page, execution_trace))
416 }
417
418 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
419 &self,
420 plan: ExecutablePlan<E>,
421 cursor: impl Into<GroupedPlannedCursor>,
422 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
423 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
424 return Err(InternalError::query_executor_invariant(
425 "grouped execution requires grouped logical plans",
426 ));
427 }
428
429 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
430
431 self.execute_grouped_path(plan, cursor)
432 }
433
434 #[expect(clippy::too_many_lines)]
436 fn execute_grouped_path(
437 &self,
438 plan: ExecutablePlan<E>,
439 cursor: GroupedPlannedCursor,
440 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
441 validate_executor_plan::<E>(plan.as_inner())?;
442 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
443 let grouped_execution = grouped_handoff.execution();
444 let group_fields = grouped_handoff.group_fields().to_vec();
445 let grouped_spec = Self::lower_grouped_spec_for_executor_contract(&grouped_handoff);
446 let grouped_route_plan =
447 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
448 let grouped_route_observability =
449 grouped_route_plan.grouped_observability().ok_or_else(|| {
450 InternalError::query_executor_invariant(
451 "grouped route planning must emit grouped observability payload",
452 )
453 })?;
454 let direction = grouped_route_plan.direction();
455 let continuation_applied = !cursor.is_empty();
456 let mut execution_trace = self
457 .debug
458 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
459 let continuation_signature = plan.continuation_signature();
460 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
461 let index_range_specs = plan.index_range_specs()?.to_vec();
462
463 grouped_spec
464 .ensure_supported_for_execution()
465 .map_err(|err| InternalError::executor_unsupported(err.to_string()))?;
466
467 let mut grouped_execution_context =
468 grouped_execution_context_from_planner_config(Some(grouped_execution));
469 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
470 debug_assert!(
471 grouped_budget.max_groups() >= grouped_budget.groups()
472 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
473 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
474 "grouped budget observability invariants must hold at grouped route entry"
475 );
476
477 let grouped_route_outcome = grouped_route_observability.outcome();
479 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
480 let grouped_route_eligible = grouped_route_observability.eligible();
481 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
482 debug_assert!(
483 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
484 "grouped route eligibility and rejection reason must stay aligned",
485 );
486 debug_assert!(
487 grouped_route_outcome
488 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
489 || grouped_route_rejection_reason.is_some(),
490 "grouped rejected outcomes must carry a rejection reason",
491 );
492 debug_assert!(
493 matches!(
494 grouped_route_execution_mode,
495 crate::db::executor::route::ExecutionMode::Materialized
496 ),
497 "grouped execution route must remain blocking/materialized",
498 );
499 let plan = plan.into_inner();
500 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
501 let mut grouped_states = grouped_spec
502 .aggregate_specs()
503 .iter()
504 .map(|spec| {
505 if spec.target_field().is_some() {
506 return Err(InternalError::executor_unsupported(format!(
507 "grouped field-target aggregates are not yet enabled: {:?}",
508 spec.kind()
509 )));
510 }
511
512 Ok(grouped_execution_context.create_grouped_state::<E>(
513 spec.kind(),
514 Self::grouped_reduction_direction(spec.kind()),
515 ))
516 })
517 .collect::<Result<Vec<_>, _>>()?;
518 let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_states.len()];
519
520 let mut span = Span::<E>::new(ExecKind::Load);
521 let ctx = self.db.recovered_context::<E>()?;
522 let execution_inputs = ExecutionInputs {
523 ctx: &ctx,
524 plan: &plan,
525 stream_bindings: AccessStreamBindings {
526 index_prefix_specs: index_prefix_specs.as_slice(),
527 index_range_specs: index_range_specs.as_slice(),
528 index_range_anchor: None,
529 direction,
530 },
531 execution_preparation: &execution_preparation,
532 };
533 record_plan_metrics(&plan.access);
534 let mut resolved = Self::resolve_execution_key_stream_without_distinct(
535 &execution_inputs,
536 &grouped_route_plan,
537 IndexCompilePolicy::ConservativeSubset,
538 )?;
539 let data_rows = ctx.rows_from_ordered_key_stream(
540 resolved.key_stream.as_mut(),
541 plan.scalar_plan().consistency,
542 )?;
543 let scanned_rows = data_rows.len();
544 let mut rows = Context::<E>::deserialize_rows(data_rows)?;
545 if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
546 rows.retain(|row| compiled_predicate.eval(&row.1));
547 }
548 let filtered_rows = rows.len();
549
550 for (id, entity) in &rows {
552 let group_values = group_fields
553 .iter()
554 .map(|field| {
555 entity.get_value_by_index(field.index()).ok_or_else(|| {
556 InternalError::query_executor_invariant(format!(
557 "grouped field slot missing on entity: index={}",
558 field.index()
559 ))
560 })
561 })
562 .collect::<Result<Vec<_>, _>>()?;
563 let group_key = Value::List(group_values)
564 .canonical_key()
565 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
566 let canonical_group_value = group_key.canonical_value().clone();
567 let data_key = DataKey::try_new::<E>(id.key())?;
568
569 for (index, state) in grouped_states.iter_mut().enumerate() {
570 if short_circuit_keys[index].iter().any(|done| {
571 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
572 }) {
573 continue;
574 }
575
576 let fold_control = state
577 .apply(group_key.clone(), &data_key, &mut grouped_execution_context)
578 .map_err(Self::map_group_error)?;
579 if matches!(fold_control, FoldControl::Break) {
580 short_circuit_keys[index].push(canonical_group_value.clone());
581 }
582 }
583 }
584
585 let aggregate_count = grouped_states.len();
587 let mut grouped_rows_by_key = Vec::<(Value, Vec<Value>)>::new();
588 for (index, state) in grouped_states.into_iter().enumerate() {
589 let finalized = state.finalize();
590 for output in finalized {
591 let group_key = output.group_key().canonical_value().clone();
592 let aggregate_value = Self::aggregate_output_to_value(output.output());
593 if let Some((_, existing_aggregates)) =
594 grouped_rows_by_key
595 .iter_mut()
596 .find(|(existing_group_key, _)| {
597 canonical_value_compare(existing_group_key, &group_key)
598 == Ordering::Equal
599 })
600 {
601 if let Some(slot) = existing_aggregates.get_mut(index) {
602 *slot = aggregate_value;
603 }
604 } else {
605 let mut aggregates = vec![Value::Null; aggregate_count];
606 if let Some(slot) = aggregates.get_mut(index) {
607 *slot = aggregate_value;
608 }
609 grouped_rows_by_key.push((group_key, aggregates));
610 }
611 }
612 }
613 grouped_rows_by_key.sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
614
615 let initial_offset = plan
617 .scalar_plan()
618 .page
619 .as_ref()
620 .map_or(0, |page| page.offset);
621 let resume_initial_offset = if cursor.is_empty() {
622 initial_offset
623 } else {
624 cursor.initial_offset()
625 };
626 let resume_boundary = cursor
627 .last_group_key()
628 .map(|last_group_key| Value::List(last_group_key.to_vec()));
629 let apply_initial_offset = cursor.is_empty();
630 let mut groups_skipped_for_offset = 0u32;
631 let limit = plan
632 .scalar_plan()
633 .page
634 .as_ref()
635 .and_then(|page| page.limit)
636 .and_then(|limit| usize::try_from(limit).ok());
637 let mut page_rows = Vec::<GroupedRow>::new();
638 let mut last_emitted_group_key: Option<Vec<Value>> = None;
639 let mut has_more = false;
640 for (group_key_value, aggregate_values) in grouped_rows_by_key {
641 if let Some(resume_boundary) = resume_boundary.as_ref()
642 && canonical_value_compare(&group_key_value, resume_boundary) != Ordering::Greater
643 {
644 continue;
645 }
646 if apply_initial_offset && groups_skipped_for_offset < initial_offset {
647 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
648 continue;
649 }
650 if limit.is_some_and(|limit| limit == 0) {
651 break;
652 }
653 if let Some(limit) = limit
654 && page_rows.len() >= limit
655 {
656 has_more = true;
657 break;
658 }
659
660 let emitted_group_key = match group_key_value {
661 Value::List(values) => values,
662 value => {
663 return Err(InternalError::query_executor_invariant(format!(
664 "grouped canonical key must be Value::List, found {value:?}"
665 )));
666 }
667 };
668 last_emitted_group_key = Some(emitted_group_key.clone());
669 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
670 }
671
672 let next_cursor = if has_more {
673 last_emitted_group_key.map(|last_group_key| {
674 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
675 continuation_signature,
676 last_group_key,
677 Direction::Asc,
678 resume_initial_offset,
679 ))
680 })
681 } else {
682 None
683 };
684 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
685 let optimization = resolved.optimization;
686 let index_predicate_applied = resolved.index_predicate_applied;
687 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
688 let distinct_keys_deduped = resolved
689 .distinct_keys_deduped_counter
690 .as_ref()
691 .map_or(0, |counter| counter.get());
692 let rows_returned = page_rows.len();
693
694 Self::finalize_path_outcome(
695 &mut execution_trace,
696 optimization,
697 rows_scanned,
698 rows_returned,
699 index_predicate_applied,
700 index_predicate_keys_rejected,
701 distinct_keys_deduped,
702 );
703 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
704 debug_assert!(
705 filtered_rows >= rows_returned,
706 "grouped pagination must return at most filtered row cardinality",
707 );
708
709 Ok((
710 GroupedCursorPage {
711 rows: page_rows,
712 next_cursor,
713 },
714 execution_trace,
715 ))
716 }
717
718 fn map_group_error(err: GroupError) -> InternalError {
720 match err {
721 GroupError::MemoryLimitExceeded { .. } => {
722 InternalError::executor_internal(err.to_string())
723 }
724 GroupError::Internal(inner) => inner,
725 }
726 }
727
728 const fn grouped_reduction_direction(kind: AggregateKind) -> Direction {
730 match kind {
731 AggregateKind::Min => Direction::Desc,
732 AggregateKind::Max
733 | AggregateKind::Count
734 | AggregateKind::Exists
735 | AggregateKind::First
736 | AggregateKind::Last => Direction::Asc,
737 }
738 }
739
740 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
742 match output {
743 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
744 AggregateOutput::Exists(value) => Value::Bool(*value),
745 AggregateOutput::Min(value)
746 | AggregateOutput::Max(value)
747 | AggregateOutput::First(value)
748 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
749 }
750 }
751
752 fn finalize_path_outcome(
754 execution_trace: &mut Option<ExecutionTrace>,
755 optimization: Option<ExecutionOptimization>,
756 rows_scanned: usize,
757 rows_returned: usize,
758 index_predicate_applied: bool,
759 index_predicate_keys_rejected: u64,
760 distinct_keys_deduped: u64,
761 ) {
762 record_rows_scanned::<E>(rows_scanned);
763 if let Some(execution_trace) = execution_trace.as_mut() {
764 execution_trace.set_path_outcome(
765 optimization,
766 rows_scanned,
767 rows_returned,
768 index_predicate_applied,
769 index_predicate_keys_rejected,
770 distinct_keys_deduped,
771 );
772 debug_assert_eq!(
773 execution_trace.keys_scanned,
774 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
775 "execution trace keys_scanned must match rows_scanned metrics input",
776 );
777 }
778 }
779
780 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
782 plan: &AccessPlannedQuery<E::Key>,
783 cursor_boundary: Option<&CursorBoundary>,
784 ) -> Result<(), InternalError> {
785 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
786 return Ok(());
787 }
788 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
789
790 Ok(())
791 }
792}