1mod execute;
2mod fast_stream;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod secondary_index;
7mod terminal;
8mod trace;
9
10pub(in crate::db::executor) use self::execute::{
11 ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
12};
13
14use self::trace::{access_path_variant, execution_order_direction};
15use crate::{
16 db::{
17 Context, Db, GroupedRow,
18 access::AccessPlan,
19 contracts::canonical_value_compare,
20 cursor::{
21 ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
22 PlannedCursor, decode_pk_cursor_boundary,
23 },
24 data::DataKey,
25 direction::Direction,
26 executor::{
27 AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
28 KeyOrderComparator, OrderedKeyStreamBox,
29 aggregate::field::{
30 AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
31 resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
32 },
33 aggregate::{
34 AggregateOutput, FoldControl, GroupError,
35 ensure_grouped_spec_supported_for_execution,
36 },
37 group::{
38 CanonicalKey, grouped_budget_observability,
39 grouped_execution_context_from_planner_config,
40 },
41 plan_metrics::{record_plan_metrics, record_rows_scanned},
42 range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
43 },
44 index::IndexCompilePolicy,
45 query::{
46 plan::{AccessPlannedQuery, LogicalPlan, OrderDirection, grouped_executor_handoff},
47 policy,
48 },
49 response::Response,
50 },
51 error::InternalError,
52 obs::sink::{ExecKind, Span},
53 traits::{EntityKind, EntityValue},
54 value::Value,
55};
56use std::{cmp::Ordering, marker::PhantomData};
57
58#[derive(Clone, Debug, Eq, PartialEq)]
64pub(in crate::db) enum PageCursor {
65 Scalar(ContinuationToken),
66 Grouped(GroupedContinuationToken),
67}
68
69impl PageCursor {
70 #[must_use]
71 pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
72 match self {
73 Self::Scalar(token) => Some(token),
74 Self::Grouped(_) => None,
75 }
76 }
77
78 #[must_use]
79 pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
80 match self {
81 Self::Scalar(_) => None,
82 Self::Grouped(token) => Some(token),
83 }
84 }
85}
86
87impl From<ContinuationToken> for PageCursor {
88 fn from(value: ContinuationToken) -> Self {
89 Self::Scalar(value)
90 }
91}
92
93impl From<GroupedContinuationToken> for PageCursor {
94 fn from(value: GroupedContinuationToken) -> Self {
95 Self::Grouped(value)
96 }
97}
98
99#[derive(Debug)]
107pub(crate) struct CursorPage<E: EntityKind> {
108 pub(crate) items: Response<E>,
109
110 pub(crate) next_cursor: Option<PageCursor>,
111}
112
113#[derive(Debug)]
119pub(in crate::db) struct GroupedCursorPage {
120 pub(in crate::db) rows: Vec<GroupedRow>,
121 pub(in crate::db) next_cursor: Option<PageCursor>,
122}
123
124#[derive(Clone, Copy, Debug, Eq, PartialEq)]
131pub enum ExecutionAccessPathVariant {
132 ByKey,
133 ByKeys,
134 KeyRange,
135 IndexPrefix,
136 IndexRange,
137 FullScan,
138 Union,
139 Intersection,
140}
141
142#[derive(Clone, Copy, Debug, Eq, PartialEq)]
149pub enum ExecutionOptimization {
150 PrimaryKey,
151 SecondaryOrderPushdown,
152 IndexRangeLimitPushdown,
153}
154
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
163pub struct ExecutionTrace {
164 pub access_path_variant: ExecutionAccessPathVariant,
165 pub direction: OrderDirection,
166 pub optimization: Option<ExecutionOptimization>,
167 pub keys_scanned: u64,
168 pub rows_returned: u64,
169 pub continuation_applied: bool,
170 pub index_predicate_applied: bool,
171 pub index_predicate_keys_rejected: u64,
172 pub distinct_keys_deduped: u64,
173}
174
175impl ExecutionTrace {
176 fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
177 Self {
178 access_path_variant: access_path_variant(access),
179 direction: execution_order_direction(direction),
180 optimization: None,
181 keys_scanned: 0,
182 rows_returned: 0,
183 continuation_applied,
184 index_predicate_applied: false,
185 index_predicate_keys_rejected: 0,
186 distinct_keys_deduped: 0,
187 }
188 }
189
190 fn set_path_outcome(
191 &mut self,
192 optimization: Option<ExecutionOptimization>,
193 keys_scanned: usize,
194 rows_returned: usize,
195 index_predicate_applied: bool,
196 index_predicate_keys_rejected: u64,
197 distinct_keys_deduped: u64,
198 ) {
199 self.optimization = optimization;
200 self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
201 self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
202 self.index_predicate_applied = index_predicate_applied;
203 self.index_predicate_keys_rejected = index_predicate_keys_rejected;
204 self.distinct_keys_deduped = distinct_keys_deduped;
205 }
206}
207
208pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
209 direction: Direction,
210) -> KeyOrderComparator {
211 KeyOrderComparator::from_direction(direction)
212}
213
214pub(in crate::db::executor) struct FastPathKeyResult {
222 pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
223 pub(in crate::db::executor) rows_scanned: usize,
224 pub(in crate::db::executor) optimization: ExecutionOptimization,
225}
226
227#[derive(Clone)]
235pub(crate) struct LoadExecutor<E: EntityKind> {
236 db: Db<E::Canister>,
237 debug: bool,
238 _marker: PhantomData<E>,
239}
240
241impl<E> LoadExecutor<E>
242where
243 E: EntityKind + EntityValue,
244{
245 #[must_use]
246 pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
247 Self {
248 db,
249 debug,
250 _marker: PhantomData,
251 }
252 }
253
254 pub(in crate::db::executor) fn recovered_context(
256 &self,
257 ) -> Result<crate::db::Context<'_, E>, InternalError> {
258 self.db.recovered_context::<E>()
259 }
260
261 pub(in crate::db::executor) fn resolve_orderable_field_slot(
264 target_field: &str,
265 ) -> Result<FieldSlot, InternalError> {
266 resolve_orderable_aggregate_target_slot::<E>(target_field)
267 .map_err(AggregateFieldValueError::into_internal_error)
268 }
269
270 pub(in crate::db::executor) fn resolve_any_field_slot(
273 target_field: &str,
274 ) -> Result<FieldSlot, InternalError> {
275 resolve_any_aggregate_target_slot::<E>(target_field)
276 .map_err(AggregateFieldValueError::into_internal_error)
277 }
278
279 pub(in crate::db::executor) fn resolve_numeric_field_slot(
282 target_field: &str,
283 ) -> Result<FieldSlot, InternalError> {
284 resolve_numeric_aggregate_target_slot::<E>(target_field)
285 .map_err(AggregateFieldValueError::into_internal_error)
286 }
287
288 pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
289 self.execute_paged_with_cursor(plan, PlannedCursor::none())
290 .map(|page| page.items)
291 }
292
293 pub(in crate::db) fn execute_paged_with_cursor(
294 &self,
295 plan: ExecutablePlan<E>,
296 cursor: impl Into<PlannedCursor>,
297 ) -> Result<CursorPage<E>, InternalError> {
298 self.execute_paged_with_cursor_traced(plan, cursor)
299 .map(|(page, _)| page)
300 }
301
302 pub(in crate::db) fn execute_paged_with_cursor_traced(
303 &self,
304 plan: ExecutablePlan<E>,
305 cursor: impl Into<PlannedCursor>,
306 ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
307 if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
308 return Err(InternalError::query_executor_invariant(
309 "grouped plans require execute_grouped pagination entrypoints",
310 ));
311 }
312
313 let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
314 let cursor_boundary = cursor.boundary().cloned();
315 let index_range_token = cursor
316 .index_range_anchor()
317 .map(range_token_from_cursor_anchor);
318
319 if !plan.mode().is_load() {
320 return Err(InternalError::query_executor_invariant(
321 "load executor requires load plans",
322 ));
323 }
324 debug_assert!(
325 policy::validate_plan_shape(&plan.as_inner().logical).is_ok(),
326 "load executor received a plan shape that bypassed planning validation",
327 );
328
329 let continuation_signature = plan.continuation_signature();
330 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
331 let index_range_specs = plan.index_range_specs()?.to_vec();
332 let route_plan = Self::build_execution_route_plan_for_load(
333 plan.as_inner(),
334 cursor_boundary.as_ref(),
335 index_range_token.as_ref(),
336 None,
337 )?;
338 let continuation_applied = !matches!(
339 route_plan.continuation_mode(),
340 crate::db::executor::route::ContinuationMode::Initial
341 );
342 let direction = route_plan.direction();
343 debug_assert_eq!(
344 route_plan.window().effective_offset,
345 ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
346 "route window effective offset must match logical plan offset semantics",
347 );
348 let mut execution_trace = self
349 .debug
350 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
351 let plan = plan.into_inner();
352 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
353
354 let result = (|| {
355 let mut span = Span::<E>::new(ExecKind::Load);
356
357 validate_executor_plan::<E>(&plan)?;
358 let ctx = self.db.recovered_context::<E>()?;
359 let execution_inputs = ExecutionInputs {
360 ctx: &ctx,
361 plan: &plan,
362 stream_bindings: AccessStreamBindings {
363 index_prefix_specs: index_prefix_specs.as_slice(),
364 index_range_specs: index_range_specs.as_slice(),
365 index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
366 direction,
367 },
368 execution_preparation: &execution_preparation,
369 };
370
371 record_plan_metrics(&plan.access);
372 let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
375 &execution_inputs,
376 &route_plan,
377 cursor_boundary.as_ref(),
378 continuation_signature,
379 IndexCompilePolicy::ConservativeSubset,
380 )?;
381 let page = materialized.page;
382 let rows_scanned = materialized.rows_scanned;
383 let post_access_rows = materialized.post_access_rows;
384 let optimization = materialized.optimization;
385 let index_predicate_applied = materialized.index_predicate_applied;
386 let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
387 let distinct_keys_deduped = materialized.distinct_keys_deduped;
388
389 Ok(Self::finalize_execution(
390 page,
391 optimization,
392 rows_scanned,
393 post_access_rows,
394 index_predicate_applied,
395 index_predicate_keys_rejected,
396 distinct_keys_deduped,
397 &mut span,
398 &mut execution_trace,
399 ))
400 })();
401
402 result.map(|page| (page, execution_trace))
403 }
404
405 pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
406 &self,
407 plan: ExecutablePlan<E>,
408 cursor: impl Into<GroupedPlannedCursor>,
409 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
410 if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
411 return Err(InternalError::query_executor_invariant(
412 "grouped execution requires grouped logical plans",
413 ));
414 }
415
416 let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
417
418 self.execute_grouped_path(plan, cursor)
419 }
420
421 #[expect(clippy::too_many_lines)]
423 fn execute_grouped_path(
424 &self,
425 plan: ExecutablePlan<E>,
426 cursor: GroupedPlannedCursor,
427 ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
428 validate_executor_plan::<E>(plan.as_inner())?;
429 let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
430 let grouped_execution = grouped_handoff.execution();
431 let group_fields = grouped_handoff.group_fields().to_vec();
432 let grouped_route_plan =
433 Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
434 let grouped_route_observability =
435 grouped_route_plan.grouped_observability().ok_or_else(|| {
436 InternalError::query_executor_invariant(
437 "grouped route planning must emit grouped observability payload",
438 )
439 })?;
440 let direction = grouped_route_plan.direction();
441 let continuation_applied = !cursor.is_empty();
442 let mut execution_trace = self
443 .debug
444 .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
445 let continuation_signature = plan.continuation_signature();
446 let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
447 let index_range_specs = plan.index_range_specs()?.to_vec();
448
449 ensure_grouped_spec_supported_for_execution(
450 grouped_handoff.group_fields(),
451 grouped_handoff.aggregates(),
452 )
453 .map_err(|err| InternalError::executor_unsupported(err.to_string()))?;
454
455 let mut grouped_execution_context =
456 grouped_execution_context_from_planner_config(Some(grouped_execution));
457 let grouped_budget = grouped_budget_observability(&grouped_execution_context);
458 debug_assert!(
459 grouped_budget.max_groups() >= grouped_budget.groups()
460 && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
461 && grouped_budget.aggregate_states() >= grouped_budget.groups(),
462 "grouped budget observability invariants must hold at grouped route entry"
463 );
464
465 let grouped_route_outcome = grouped_route_observability.outcome();
467 let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
468 let grouped_route_eligible = grouped_route_observability.eligible();
469 let grouped_route_execution_mode = grouped_route_observability.execution_mode();
470 debug_assert!(
471 grouped_route_eligible == grouped_route_rejection_reason.is_none(),
472 "grouped route eligibility and rejection reason must stay aligned",
473 );
474 debug_assert!(
475 grouped_route_outcome
476 != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
477 || grouped_route_rejection_reason.is_some(),
478 "grouped rejected outcomes must carry a rejection reason",
479 );
480 debug_assert!(
481 matches!(
482 grouped_route_execution_mode,
483 crate::db::executor::route::ExecutionMode::Materialized
484 ),
485 "grouped execution route must remain blocking/materialized",
486 );
487 let mut grouped_engines = grouped_handoff
488 .aggregates()
489 .iter()
490 .map(|aggregate| {
491 if aggregate.target_field().is_some() {
492 return Err(InternalError::query_executor_invariant(format!(
493 "grouped field-target aggregate reached executor after planning: {:?}",
494 aggregate.kind()
495 )));
496 }
497
498 Ok(grouped_execution_context.create_grouped_engine::<E>(
499 aggregate.kind(),
500 aggregate.kind().materialized_fold_direction(),
501 ))
502 })
503 .collect::<Result<Vec<_>, _>>()?;
504 let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
505 let plan = plan.into_inner();
506 let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
507
508 let mut span = Span::<E>::new(ExecKind::Load);
509 let ctx = self.db.recovered_context::<E>()?;
510 let execution_inputs = ExecutionInputs {
511 ctx: &ctx,
512 plan: &plan,
513 stream_bindings: AccessStreamBindings {
514 index_prefix_specs: index_prefix_specs.as_slice(),
515 index_range_specs: index_range_specs.as_slice(),
516 index_range_anchor: None,
517 direction,
518 },
519 execution_preparation: &execution_preparation,
520 };
521 record_plan_metrics(&plan.access);
522 let mut resolved = Self::resolve_execution_key_stream_without_distinct(
523 &execution_inputs,
524 &grouped_route_plan,
525 IndexCompilePolicy::ConservativeSubset,
526 )?;
527 let data_rows = ctx.rows_from_ordered_key_stream(
528 resolved.key_stream.as_mut(),
529 plan.scalar_plan().consistency,
530 )?;
531 let scanned_rows = data_rows.len();
532 let mut rows = Context::<E>::deserialize_rows(data_rows)?;
533 if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
534 rows.retain(|row| compiled_predicate.eval(&row.1));
535 }
536 let filtered_rows = rows.len();
537
538 for (id, entity) in &rows {
540 let group_values = group_fields
541 .iter()
542 .map(|field| {
543 entity.get_value_by_index(field.index()).ok_or_else(|| {
544 InternalError::query_executor_invariant(format!(
545 "grouped field slot missing on entity: index={}",
546 field.index()
547 ))
548 })
549 })
550 .collect::<Result<Vec<_>, _>>()?;
551 let group_key = Value::List(group_values)
552 .canonical_key()
553 .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
554 let canonical_group_value = group_key.canonical_value().clone();
555 let data_key = DataKey::try_new::<E>(id.key())?;
556
557 for (index, engine) in grouped_engines.iter_mut().enumerate() {
558 if short_circuit_keys[index].iter().any(|done| {
559 canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
560 }) {
561 continue;
562 }
563
564 let fold_control = engine
565 .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
566 .map_err(Self::map_group_error)?;
567 if matches!(fold_control, FoldControl::Break) {
568 short_circuit_keys[index].push(canonical_group_value.clone());
569 }
570 }
571 }
572
573 let aggregate_count = grouped_engines.len();
575 let mut grouped_rows_by_key = Vec::<(Value, Vec<Value>)>::new();
576 for (index, engine) in grouped_engines.into_iter().enumerate() {
577 let finalized = engine.finalize_grouped()?;
578 for output in finalized {
579 let group_key = output.group_key().canonical_value().clone();
580 let aggregate_value = Self::aggregate_output_to_value(output.output());
581 if let Some((_, existing_aggregates)) =
582 grouped_rows_by_key
583 .iter_mut()
584 .find(|(existing_group_key, _)| {
585 canonical_value_compare(existing_group_key, &group_key)
586 == Ordering::Equal
587 })
588 {
589 if let Some(slot) = existing_aggregates.get_mut(index) {
590 *slot = aggregate_value;
591 }
592 } else {
593 let mut aggregates = vec![Value::Null; aggregate_count];
594 if let Some(slot) = aggregates.get_mut(index) {
595 *slot = aggregate_value;
596 }
597 grouped_rows_by_key.push((group_key, aggregates));
598 }
599 }
600 }
601 grouped_rows_by_key.sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
602
603 let initial_offset = plan
605 .scalar_plan()
606 .page
607 .as_ref()
608 .map_or(0, |page| page.offset);
609 let resume_initial_offset = if cursor.is_empty() {
610 initial_offset
611 } else {
612 cursor.initial_offset()
613 };
614 let resume_boundary = cursor
615 .last_group_key()
616 .map(|last_group_key| Value::List(last_group_key.to_vec()));
617 let apply_initial_offset = cursor.is_empty();
618 let mut groups_skipped_for_offset = 0u32;
619 let limit = plan
620 .scalar_plan()
621 .page
622 .as_ref()
623 .and_then(|page| page.limit)
624 .and_then(|limit| usize::try_from(limit).ok());
625 let mut page_rows = Vec::<GroupedRow>::new();
626 let mut last_emitted_group_key: Option<Vec<Value>> = None;
627 let mut has_more = false;
628 for (group_key_value, aggregate_values) in grouped_rows_by_key {
629 if let Some(resume_boundary) = resume_boundary.as_ref()
630 && canonical_value_compare(&group_key_value, resume_boundary) != Ordering::Greater
631 {
632 continue;
633 }
634 if apply_initial_offset && groups_skipped_for_offset < initial_offset {
635 groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
636 continue;
637 }
638 if limit.is_some_and(|limit| limit == 0) {
639 break;
640 }
641 if let Some(limit) = limit
642 && page_rows.len() >= limit
643 {
644 has_more = true;
645 break;
646 }
647
648 let emitted_group_key = match group_key_value {
649 Value::List(values) => values,
650 value => {
651 return Err(InternalError::query_executor_invariant(format!(
652 "grouped canonical key must be Value::List, found {value:?}"
653 )));
654 }
655 };
656 last_emitted_group_key = Some(emitted_group_key.clone());
657 page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
658 }
659
660 let next_cursor = if has_more {
661 last_emitted_group_key.map(|last_group_key| {
662 PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
663 continuation_signature,
664 last_group_key,
665 Direction::Asc,
666 resume_initial_offset,
667 ))
668 })
669 } else {
670 None
671 };
672 let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
673 let optimization = resolved.optimization;
674 let index_predicate_applied = resolved.index_predicate_applied;
675 let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
676 let distinct_keys_deduped = resolved
677 .distinct_keys_deduped_counter
678 .as_ref()
679 .map_or(0, |counter| counter.get());
680 let rows_returned = page_rows.len();
681
682 Self::finalize_path_outcome(
683 &mut execution_trace,
684 optimization,
685 rows_scanned,
686 rows_returned,
687 index_predicate_applied,
688 index_predicate_keys_rejected,
689 distinct_keys_deduped,
690 );
691 span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
692 debug_assert!(
693 filtered_rows >= rows_returned,
694 "grouped pagination must return at most filtered row cardinality",
695 );
696
697 Ok((
698 GroupedCursorPage {
699 rows: page_rows,
700 next_cursor,
701 },
702 execution_trace,
703 ))
704 }
705
706 fn map_group_error(err: GroupError) -> InternalError {
708 match err {
709 GroupError::MemoryLimitExceeded { .. } => {
710 InternalError::executor_internal(err.to_string())
711 }
712 GroupError::Internal(inner) => inner,
713 }
714 }
715
716 fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
718 match output {
719 AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
720 AggregateOutput::Exists(value) => Value::Bool(*value),
721 AggregateOutput::Min(value)
722 | AggregateOutput::Max(value)
723 | AggregateOutput::First(value)
724 | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
725 }
726 }
727
728 fn finalize_path_outcome(
730 execution_trace: &mut Option<ExecutionTrace>,
731 optimization: Option<ExecutionOptimization>,
732 rows_scanned: usize,
733 rows_returned: usize,
734 index_predicate_applied: bool,
735 index_predicate_keys_rejected: u64,
736 distinct_keys_deduped: u64,
737 ) {
738 record_rows_scanned::<E>(rows_scanned);
739 if let Some(execution_trace) = execution_trace.as_mut() {
740 execution_trace.set_path_outcome(
741 optimization,
742 rows_scanned,
743 rows_returned,
744 index_predicate_applied,
745 index_predicate_keys_rejected,
746 distinct_keys_deduped,
747 );
748 debug_assert_eq!(
749 execution_trace.keys_scanned,
750 u64::try_from(rows_scanned).unwrap_or(u64::MAX),
751 "execution trace keys_scanned must match rows_scanned metrics input",
752 );
753 }
754 }
755
756 pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
758 plan: &AccessPlannedQuery<E::Key>,
759 cursor_boundary: Option<&CursorBoundary>,
760 ) -> Result<(), InternalError> {
761 if !Self::pk_order_stream_fast_path_shape_supported(plan) {
762 return Ok(());
763 }
764 let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
765
766 Ok(())
767 }
768}