1mod execute;
7mod fast_stream;
8mod index_range_limit;
9mod page;
10mod pk_stream;
11mod secondary_index;
12mod terminal;
13mod trace;
14
15pub(in crate::db::executor) use self::execute::{
16 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
17};
18
19use self::trace::{access_path_variant, execution_order_direction};
20use crate::{
21 db::{
22 Context, Db, GroupedRow,
23 access::AccessPlan,
24 contracts::canonical_value_compare,
25 cursor::{
26 ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
27 PlannedCursor, decode_pk_cursor_boundary,
28 },
29 data::DataKey,
30 direction::Direction,
31 executor::{
32 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
33 KeyOrderComparator, OrderedKeyStreamBox,
34 aggregate::field::{
35 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
36 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
37 },
38 aggregate::{AggregateOutput, FoldControl, GroupError},
39 group::{
40 CanonicalKey, grouped_budget_observability,
41 grouped_execution_context_from_planner_config,
42 },
43 plan_metrics::{record_plan_metrics, record_rows_scanned},
44 range_token_anchor_key, range_token_from_cursor_anchor,
45 route::aggregate_materialized_fold_direction,
46 validate_executor_plan,
47 },
48 index::IndexCompilePolicy,
49 query::plan::{AccessPlannedQuery, LogicalPlan, OrderDirection, grouped_executor_handoff},
50 response::Response,
51 },
52 error::InternalError,
53 obs::sink::{ExecKind, Span},
54 traits::{EntityKind, EntityValue},
55 value::Value,
56};
57use std::{cmp::Ordering, marker::PhantomData};
58
59#[derive(Clone, Debug, Eq, PartialEq)]
65pub(in crate::db) enum PageCursor {
66 Scalar(ContinuationToken),
67 Grouped(GroupedContinuationToken),
68}
69
70impl PageCursor {
71 #[must_use]
73 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
74 match self {
75 Self::Scalar(token) => Some(token),
76 Self::Grouped(_) => None,
77 }
78 }
79
80 #[must_use]
82 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
83 match self {
84 Self::Scalar(_) => None,
85 Self::Grouped(token) => Some(token),
86 }
87 }
88}
89
90impl From<ContinuationToken> for PageCursor {
91 fn from(value: ContinuationToken) -> Self {
92 Self::Scalar(value)
93 }
94}
95
96impl From<GroupedContinuationToken> for PageCursor {
97 fn from(value: GroupedContinuationToken) -> Self {
98 Self::Grouped(value)
99 }
100}
101
102#[derive(Debug)]
110pub(crate) struct CursorPage<E: EntityKind> {
111 pub(crate) items: Response<E>,
112
113 pub(crate) next_cursor: Option<PageCursor>,
114}
115
116#[derive(Debug)]
122pub(in crate::db) struct GroupedCursorPage {
123 pub(in crate::db) rows: Vec<GroupedRow>,
124 pub(in crate::db) next_cursor: Option<PageCursor>,
125}
126
127#[derive(Clone, Copy, Debug, Eq, PartialEq)]
134pub enum ExecutionAccessPathVariant {
135 ByKey,
136 ByKeys,
137 KeyRange,
138 IndexPrefix,
139 IndexRange,
140 FullScan,
141 Union,
142 Intersection,
143}
144
145#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub enum ExecutionOptimization {
153 PrimaryKey,
154 SecondaryOrderPushdown,
155 IndexRangeLimitPushdown,
156}
157
158#[derive(Clone, Copy, Debug, Eq, PartialEq)]
166pub struct ExecutionTrace {
167 pub access_path_variant: ExecutionAccessPathVariant,
168 pub direction: OrderDirection,
169 pub optimization: Option<ExecutionOptimization>,
170 pub keys_scanned: u64,
171 pub rows_returned: u64,
172 pub continuation_applied: bool,
173 pub index_predicate_applied: bool,
174 pub index_predicate_keys_rejected: u64,
175 pub distinct_keys_deduped: u64,
176}
177
178impl ExecutionTrace {
179 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
180 Self {
181 access_path_variant: access_path_variant(access),
182 direction: execution_order_direction(direction),
183 optimization: None,
184 keys_scanned: 0,
185 rows_returned: 0,
186 continuation_applied,
187 index_predicate_applied: false,
188 index_predicate_keys_rejected: 0,
189 distinct_keys_deduped: 0,
190 }
191 }
192
193 fn set_path_outcome(
194 &mut self,
195 optimization: Option<ExecutionOptimization>,
196 keys_scanned: usize,
197 rows_returned: usize,
198 index_predicate_applied: bool,
199 index_predicate_keys_rejected: u64,
200 distinct_keys_deduped: u64,
201 ) {
202 self.optimization = optimization;
203 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
204 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
205 self.index_predicate_applied = index_predicate_applied;
206 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
207 self.distinct_keys_deduped = distinct_keys_deduped;
208 }
209}
210
211pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
213 direction: Direction,
214) -> KeyOrderComparator {
215 KeyOrderComparator::from_direction(direction)
216}
217
218pub(in crate::db::executor) struct FastPathKeyResult {
226 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
227 pub(in crate::db::executor) rows_scanned: usize,
228 pub(in crate::db::executor) optimization: ExecutionOptimization,
229}
230
231#[derive(Clone)]
239pub(crate) struct LoadExecutor<E: EntityKind> {
240 db: Db<E::Canister>,
241 debug: bool,
242 _marker: PhantomData<E>,
243}
244
245impl<E> LoadExecutor<E>
246where
247 E: EntityKind + EntityValue,
248{
249 #[must_use]
251 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
252 Self {
253 db,
254 debug,
255 _marker: PhantomData,
256 }
257 }
258
259 pub(in crate::db::executor) fn recovered_context(
261 &self,
262 ) -> Result<crate::db::Context<'_, E>, InternalError> {
263 self.db.recovered_context::<E>()
264 }
265
266 pub(in crate::db::executor) fn resolve_orderable_field_slot(
269 target_field: &str,
270 ) -> Result<FieldSlot, InternalError> {
271 resolve_orderable_aggregate_target_slot::<E>(target_field)
272 .map_err(AggregateFieldValueError::into_internal_error)
273 }
274
275 pub(in crate::db::executor) fn resolve_any_field_slot(
278 target_field: &str,
279 ) -> Result<FieldSlot, InternalError> {
280 resolve_any_aggregate_target_slot::<E>(target_field)
281 .map_err(AggregateFieldValueError::into_internal_error)
282 }
283
284 pub(in crate::db::executor) fn resolve_numeric_field_slot(
287 target_field: &str,
288 ) -> Result<FieldSlot, InternalError> {
289 resolve_numeric_aggregate_target_slot::<E>(target_field)
290 .map_err(AggregateFieldValueError::into_internal_error)
291 }
292
293 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
294 self.execute_paged_with_cursor(plan, PlannedCursor::none())
295 .map(|page| page.items)
296 }
297
298 pub(in crate::db) fn execute_paged_with_cursor(
299 &self,
300 plan: ExecutablePlan<E>,
301 cursor: impl Into<PlannedCursor>,
302 ) -> Result<CursorPage<E>, InternalError> {
303 self.execute_paged_with_cursor_traced(plan, cursor)
304 .map(|(page, _)| page)
305 }
306
307 pub(in crate::db) fn execute_paged_with_cursor_traced(
308 &self,
309 plan: ExecutablePlan<E>,
310 cursor: impl Into<PlannedCursor>,
311 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
312 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
313 return Err(InternalError::query_executor_invariant(
314 "grouped plans require execute_grouped pagination entrypoints",
315 ));
316 }
317
318 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
319 let cursor_boundary = cursor.boundary().cloned();
320 let index_range_token = cursor
321 .index_range_anchor()
322 .map(range_token_from_cursor_anchor);
323
324 if !plan.mode().is_load() {
325 return Err(InternalError::query_executor_invariant(
326 "load executor requires load plans",
327 ));
328 }
329
330 let continuation_signature = plan.continuation_signature();
331 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
332 let index_range_specs = plan.index_range_specs()?.to_vec();
333 let route_plan = Self::build_execution_route_plan_for_load(
334 plan.as_inner(),
335 cursor_boundary.as_ref(),
336 index_range_token.as_ref(),
337 None,
338 )?;
339 let continuation_applied = !matches!(
340 route_plan.continuation_mode(),
341 crate::db::executor::route::ContinuationMode::Initial
342 );
343 let direction = route_plan.direction();
344 debug_assert_eq!(
345 route_plan.window().effective_offset,
346 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
347 "route window effective offset must match logical plan offset semantics",
348 );
349 let mut execution_trace = self
350 .debug
351 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
352 let plan = plan.into_inner();
353 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
354
355 let result = (|| {
356 let mut span = Span::<E>::new(ExecKind::Load);
357
358 validate_executor_plan::<E>(&plan)?;
359 let ctx = self.db.recovered_context::<E>()?;
360 let execution_inputs = ExecutionInputs {
361 ctx: &ctx,
362 plan: &plan,
363 stream_bindings: AccessStreamBindings {
364 index_prefix_specs: index_prefix_specs.as_slice(),
365 index_range_specs: index_range_specs.as_slice(),
366 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
367 direction,
368 },
369 execution_preparation: &execution_preparation,
370 };
371
372 record_plan_metrics(&plan.access);
373 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
376 &execution_inputs,
377 &route_plan,
378 cursor_boundary.as_ref(),
379 continuation_signature,
380 IndexCompilePolicy::ConservativeSubset,
381 )?;
382 let page = materialized.page;
383 let rows_scanned = materialized.rows_scanned;
384 let post_access_rows = materialized.post_access_rows;
385 let optimization = materialized.optimization;
386 let index_predicate_applied = materialized.index_predicate_applied;
387 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
388 let distinct_keys_deduped = materialized.distinct_keys_deduped;
389
390 Ok(Self::finalize_execution(
391 page,
392 optimization,
393 rows_scanned,
394 post_access_rows,
395 index_predicate_applied,
396 index_predicate_keys_rejected,
397 distinct_keys_deduped,
398 &mut span,
399 &mut execution_trace,
400 ))
401 })();
402
403 result.map(|page| (page, execution_trace))
404 }
405
406 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
407 &self,
408 plan: ExecutablePlan<E>,
409 cursor: impl Into<GroupedPlannedCursor>,
410 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
411 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
412 return Err(InternalError::query_executor_invariant(
413 "grouped execution requires grouped logical plans",
414 ));
415 }
416
417 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
418
419 self.execute_grouped_path(plan, cursor)
420 }
421
422 #[expect(clippy::too_many_lines)]
424 fn execute_grouped_path(
425 &self,
426 plan: ExecutablePlan<E>,
427 cursor: GroupedPlannedCursor,
428 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
429 validate_executor_plan::<E>(plan.as_inner())?;
430 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
431 let grouped_execution = grouped_handoff.execution();
432 let group_fields = grouped_handoff.group_fields().to_vec();
433 let grouped_route_plan =
434 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
435 let grouped_route_observability =
436 grouped_route_plan.grouped_observability().ok_or_else(|| {
437 InternalError::query_executor_invariant(
438 "grouped route planning must emit grouped observability payload",
439 )
440 })?;
441 let direction = grouped_route_plan.direction();
442 let continuation_applied = !cursor.is_empty();
443 let mut execution_trace = self
444 .debug
445 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
446 let continuation_signature = plan.continuation_signature();
447 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
448 let index_range_specs = plan.index_range_specs()?.to_vec();
449
450 let mut grouped_execution_context =
451 grouped_execution_context_from_planner_config(Some(grouped_execution));
452 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
453 debug_assert!(
454 grouped_budget.max_groups() >= grouped_budget.groups()
455 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
456 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
457 "grouped budget observability invariants must hold at grouped route entry"
458 );
459
460 let grouped_route_outcome = grouped_route_observability.outcome();
462 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
463 let grouped_route_eligible = grouped_route_observability.eligible();
464 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
465 debug_assert!(
466 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
467 "grouped route eligibility and rejection reason must stay aligned",
468 );
469 debug_assert!(
470 grouped_route_outcome
471 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
472 || grouped_route_rejection_reason.is_some(),
473 "grouped rejected outcomes must carry a rejection reason",
474 );
475 debug_assert!(
476 matches!(
477 grouped_route_execution_mode,
478 crate::db::executor::route::ExecutionMode::Materialized
479 ),
480 "grouped execution route must remain blocking/materialized",
481 );
482 let mut grouped_engines = grouped_handoff
483 .aggregates()
484 .iter()
485 .map(|aggregate| {
486 if aggregate.target_field().is_some() {
487 return Err(InternalError::query_executor_invariant(format!(
488 "grouped field-target aggregate reached executor after planning: {:?}",
489 aggregate.kind()
490 )));
491 }
492
493 Ok(grouped_execution_context.create_grouped_engine::<E>(
494 aggregate.kind(),
495 aggregate_materialized_fold_direction(aggregate.kind()),
496 ))
497 })
498 .collect::<Result<Vec<_>, _>>()?;
499 let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
500 let plan = plan.into_inner();
501 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
502
503 let mut span = Span::<E>::new(ExecKind::Load);
504 let ctx = self.db.recovered_context::<E>()?;
505 let execution_inputs = ExecutionInputs {
506 ctx: &ctx,
507 plan: &plan,
508 stream_bindings: AccessStreamBindings {
509 index_prefix_specs: index_prefix_specs.as_slice(),
510 index_range_specs: index_range_specs.as_slice(),
511 index_range_anchor: None,
512 direction,
513 },
514 execution_preparation: &execution_preparation,
515 };
516 record_plan_metrics(&plan.access);
517 let mut resolved = Self::resolve_execution_key_stream_without_distinct(
518 &execution_inputs,
519 &grouped_route_plan,
520 IndexCompilePolicy::ConservativeSubset,
521 )?;
522 let data_rows = ctx.rows_from_ordered_key_stream(
523 resolved.key_stream.as_mut(),
524 plan.scalar_plan().consistency,
525 )?;
526 let scanned_rows = data_rows.len();
527 let mut rows = Context::<E>::deserialize_rows(data_rows)?;
528 if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
529 rows.retain(|row| compiled_predicate.eval(&row.1));
530 }
531 let filtered_rows = rows.len();
532
533 for (id, entity) in &rows {
535 let group_values = group_fields
536 .iter()
537 .map(|field| {
538 entity.get_value_by_index(field.index()).ok_or_else(|| {
539 InternalError::query_executor_invariant(format!(
540 "grouped field slot missing on entity: index={}",
541 field.index()
542 ))
543 })
544 })
545 .collect::<Result<Vec<_>, _>>()?;
546 let group_key = Value::List(group_values)
547 .canonical_key()
548 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
549 let canonical_group_value = group_key.canonical_value().clone();
550 let data_key = DataKey::try_new::<E>(id.key())?;
551
552 for (index, engine) in grouped_engines.iter_mut().enumerate() {
553 if short_circuit_keys[index].iter().any(|done| {
554 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
555 }) {
556 continue;
557 }
558
559 let fold_control = engine
560 .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
561 .map_err(Self::map_group_error)?;
562 if matches!(fold_control, FoldControl::Break) {
563 short_circuit_keys[index].push(canonical_group_value.clone());
564 }
565 }
566 }
567
568 let aggregate_count = grouped_engines.len();
570 let mut grouped_rows_by_key = Vec::<(Value, Vec<Value>)>::new();
571 for (index, engine) in grouped_engines.into_iter().enumerate() {
572 let finalized = engine.finalize_grouped()?;
573 for output in finalized {
574 let group_key = output.group_key().canonical_value().clone();
575 let aggregate_value = Self::aggregate_output_to_value(output.output());
576 if let Some((_, existing_aggregates)) =
577 grouped_rows_by_key
578 .iter_mut()
579 .find(|(existing_group_key, _)| {
580 canonical_value_compare(existing_group_key, &group_key)
581 == Ordering::Equal
582 })
583 {
584 if let Some(slot) = existing_aggregates.get_mut(index) {
585 *slot = aggregate_value;
586 }
587 } else {
588 let mut aggregates = vec![Value::Null; aggregate_count];
589 if let Some(slot) = aggregates.get_mut(index) {
590 *slot = aggregate_value;
591 }
592 grouped_rows_by_key.push((group_key, aggregates));
593 }
594 }
595 }
596 grouped_rows_by_key.sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
597
598 let initial_offset = plan
600 .scalar_plan()
601 .page
602 .as_ref()
603 .map_or(0, |page| page.offset);
604 let resume_initial_offset = if cursor.is_empty() {
605 initial_offset
606 } else {
607 cursor.initial_offset()
608 };
609 let resume_boundary = cursor
610 .last_group_key()
611 .map(|last_group_key| Value::List(last_group_key.to_vec()));
612 let apply_initial_offset = cursor.is_empty();
613 let mut groups_skipped_for_offset = 0u32;
614 let limit = plan
615 .scalar_plan()
616 .page
617 .as_ref()
618 .and_then(|page| page.limit)
619 .and_then(|limit| usize::try_from(limit).ok());
620 let mut page_rows = Vec::<GroupedRow>::new();
621 let mut last_emitted_group_key: Option<Vec<Value>> = None;
622 let mut has_more = false;
623 for (group_key_value, aggregate_values) in grouped_rows_by_key {
624 if let Some(resume_boundary) = resume_boundary.as_ref()
625 && canonical_value_compare(&group_key_value, resume_boundary) != Ordering::Greater
626 {
627 continue;
628 }
629 if apply_initial_offset && groups_skipped_for_offset < initial_offset {
630 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
631 continue;
632 }
633 if limit.is_some_and(|limit| limit == 0) {
634 break;
635 }
636 if let Some(limit) = limit
637 && page_rows.len() >= limit
638 {
639 has_more = true;
640 break;
641 }
642
643 let emitted_group_key = match group_key_value {
644 Value::List(values) => values,
645 value => {
646 return Err(InternalError::query_executor_invariant(format!(
647 "grouped canonical key must be Value::List, found {value:?}"
648 )));
649 }
650 };
651 last_emitted_group_key = Some(emitted_group_key.clone());
652 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
653 }
654
655 let next_cursor = if has_more {
656 last_emitted_group_key.map(|last_group_key| {
657 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
658 continuation_signature,
659 last_group_key,
660 Direction::Asc,
661 resume_initial_offset,
662 ))
663 })
664 } else {
665 None
666 };
667 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
668 let optimization = resolved.optimization;
669 let index_predicate_applied = resolved.index_predicate_applied;
670 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
671 let distinct_keys_deduped = resolved
672 .distinct_keys_deduped_counter
673 .as_ref()
674 .map_or(0, |counter| counter.get());
675 let rows_returned = page_rows.len();
676
677 Self::finalize_path_outcome(
678 &mut execution_trace,
679 optimization,
680 rows_scanned,
681 rows_returned,
682 index_predicate_applied,
683 index_predicate_keys_rejected,
684 distinct_keys_deduped,
685 );
686 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
687 debug_assert!(
688 filtered_rows >= rows_returned,
689 "grouped pagination must return at most filtered row cardinality",
690 );
691
692 Ok((
693 GroupedCursorPage {
694 rows: page_rows,
695 next_cursor,
696 },
697 execution_trace,
698 ))
699 }
700
701 fn map_group_error(err: GroupError) -> InternalError {
703 match err {
704 GroupError::MemoryLimitExceeded { .. } => {
705 InternalError::executor_internal(err.to_string())
706 }
707 GroupError::Internal(inner) => inner,
708 }
709 }
710
711 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
713 match output {
714 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
715 AggregateOutput::Exists(value) => Value::Bool(*value),
716 AggregateOutput::Min(value)
717 | AggregateOutput::Max(value)
718 | AggregateOutput::First(value)
719 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
720 }
721 }
722
723 fn finalize_path_outcome(
725 execution_trace: &mut Option<ExecutionTrace>,
726 optimization: Option<ExecutionOptimization>,
727 rows_scanned: usize,
728 rows_returned: usize,
729 index_predicate_applied: bool,
730 index_predicate_keys_rejected: u64,
731 distinct_keys_deduped: u64,
732 ) {
733 record_rows_scanned::<E>(rows_scanned);
734 if let Some(execution_trace) = execution_trace.as_mut() {
735 execution_trace.set_path_outcome(
736 optimization,
737 rows_scanned,
738 rows_returned,
739 index_predicate_applied,
740 index_predicate_keys_rejected,
741 distinct_keys_deduped,
742 );
743 debug_assert_eq!(
744 execution_trace.keys_scanned,
745 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
746 "execution trace keys_scanned must match rows_scanned metrics input",
747 );
748 }
749 }
750
751 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
753 plan: &AccessPlannedQuery<E::Key>,
754 cursor_boundary: Option<&CursorBoundary>,
755 ) -> Result<(), InternalError> {
756 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
757 return Ok(());
758 }
759 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
760
761 Ok(())
762 }
763}