Skip to main content

icydb_core/db/executor/load/
mod.rs

1//! Module: executor::load
2//! Responsibility: load-path execution orchestration, pagination, and trace contracts.
3//! Does not own: logical planning semantics or relation/commit mutation policy.
4//! Boundary: consumes executable load plans and delegates post-access semantics to kernel.
5#![deny(unreachable_patterns)]
6
7mod execute;
8mod fast_stream;
9mod index_range_limit;
10mod page;
11mod pk_stream;
12mod secondary_index;
13mod terminal;
14mod trace;
15
16pub(in crate::db::executor) use self::execute::{
17    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
18};
19
20use self::trace::{access_path_variant, execution_order_direction};
21use crate::{
22    db::{
23        Context, Db, GroupedRow,
24        access::AccessPlan,
25        contracts::canonical_value_compare,
26        cursor::{
27            ContinuationSignature, ContinuationToken, CursorBoundary, GroupedContinuationToken,
28            GroupedPlannedCursor, PlannedCursor, decode_pk_cursor_boundary,
29        },
30        data::DataKey,
31        direction::Direction,
32        executor::{
33            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
34            KeyOrderComparator, OrderedKeyStreamBox,
35            aggregate::field::{
36                AggregateFieldValueError, FieldSlot, extract_numeric_field_decimal,
37                extract_orderable_field_value, resolve_any_aggregate_target_slot,
38                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
39            },
40            aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
41            group::{
42                CanonicalKey, GroupKeySet, KeyCanonicalError, grouped_budget_observability,
43                grouped_execution_context_from_planner_config,
44            },
45            plan_metrics::{
46                GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
47                record_rows_scanned,
48            },
49            range_token_anchor_key, range_token_from_cursor_anchor,
50            route::aggregate_materialized_fold_direction,
51            validate_executor_plan,
52        },
53        index::IndexCompilePolicy,
54        predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
55        query::plan::{
56            AccessPlannedQuery, GroupAggregateSpec, GroupHavingSpec, GroupHavingSymbol,
57            LogicalPlan, OrderDirection, grouped_executor_handoff,
58        },
59        response::Response,
60    },
61    error::InternalError,
62    obs::sink::{ExecKind, Span},
63    traits::{EntityKind, EntityValue},
64    types::Decimal,
65    value::Value,
66};
67use std::{cmp::Ordering, marker::PhantomData};
68
69///
70/// PageCursor
71///
72/// Internal continuation cursor enum for scalar and grouped pagination.
73///
74#[derive(Clone, Debug, Eq, PartialEq)]
75pub(in crate::db) enum PageCursor {
76    Scalar(ContinuationToken),
77    Grouped(GroupedContinuationToken),
78}
79
80impl PageCursor {
81    /// Borrow scalar continuation token when this cursor is scalar-shaped.
82    #[must_use]
83    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
84        match self {
85            Self::Scalar(token) => Some(token),
86            Self::Grouped(_) => None,
87        }
88    }
89
90    /// Borrow grouped continuation token when this cursor is grouped-shaped.
91    #[must_use]
92    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
93        match self {
94            Self::Scalar(_) => None,
95            Self::Grouped(token) => Some(token),
96        }
97    }
98}
99
100impl From<ContinuationToken> for PageCursor {
101    fn from(value: ContinuationToken) -> Self {
102        Self::Scalar(value)
103    }
104}
105
106impl From<GroupedContinuationToken> for PageCursor {
107    fn from(value: GroupedContinuationToken) -> Self {
108        Self::Grouped(value)
109    }
110}
111
112///
113/// CursorPage
114///
115/// Internal load page result with continuation cursor payload.
116/// Returned by paged executor entrypoints.
117///
118
119#[derive(Debug)]
120pub(crate) struct CursorPage<E: EntityKind> {
121    pub(crate) items: Response<E>,
122
123    pub(crate) next_cursor: Option<PageCursor>,
124}
125
126///
127/// GroupedCursorPage
128///
129/// Internal grouped page result with grouped rows and continuation cursor payload.
130///
131#[derive(Debug)]
132pub(in crate::db) struct GroupedCursorPage {
133    pub(in crate::db) rows: Vec<GroupedRow>,
134    pub(in crate::db) next_cursor: Option<PageCursor>,
135}
136
137///
138/// ExecutionAccessPathVariant
139///
140/// Coarse access path shape used by the load execution trace surface.
141///
142
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub enum ExecutionAccessPathVariant {
145    ByKey,
146    ByKeys,
147    KeyRange,
148    IndexPrefix,
149    IndexRange,
150    FullScan,
151    Union,
152    Intersection,
153}
154
155///
156/// ExecutionOptimization
157///
158/// Canonical load optimization selected by execution, if any.
159///
160
161#[derive(Clone, Copy, Debug, Eq, PartialEq)]
162pub enum ExecutionOptimization {
163    PrimaryKey,
164    SecondaryOrderPushdown,
165    IndexRangeLimitPushdown,
166}
167
168///
169/// ExecutionTrace
170///
171/// Structured, opt-in load execution introspection snapshot.
172/// Captures plan-shape and execution decisions without changing semantics.
173///
174
175#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub struct ExecutionTrace {
177    pub access_path_variant: ExecutionAccessPathVariant,
178    pub direction: OrderDirection,
179    pub optimization: Option<ExecutionOptimization>,
180    pub keys_scanned: u64,
181    pub rows_returned: u64,
182    pub continuation_applied: bool,
183    pub index_predicate_applied: bool,
184    pub index_predicate_keys_rejected: u64,
185    pub distinct_keys_deduped: u64,
186}
187
188impl ExecutionTrace {
189    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
190        Self {
191            access_path_variant: access_path_variant(access),
192            direction: execution_order_direction(direction),
193            optimization: None,
194            keys_scanned: 0,
195            rows_returned: 0,
196            continuation_applied,
197            index_predicate_applied: false,
198            index_predicate_keys_rejected: 0,
199            distinct_keys_deduped: 0,
200        }
201    }
202
203    fn set_path_outcome(
204        &mut self,
205        optimization: Option<ExecutionOptimization>,
206        keys_scanned: usize,
207        rows_returned: usize,
208        index_predicate_applied: bool,
209        index_predicate_keys_rejected: u64,
210        distinct_keys_deduped: u64,
211    ) {
212        self.optimization = optimization;
213        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
214        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
215        self.index_predicate_applied = index_predicate_applied;
216        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
217        self.distinct_keys_deduped = distinct_keys_deduped;
218    }
219}
220
221/// Resolve key-stream comparator contract from runtime direction.
222pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
223    direction: Direction,
224) -> KeyOrderComparator {
225    KeyOrderComparator::from_direction(direction)
226}
227
228///
229/// FastPathKeyResult
230///
231/// Internal fast-path access result.
232/// Carries ordered keys plus observability metadata for shared execution phases.
233///
234
235pub(in crate::db::executor) struct FastPathKeyResult {
236    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
237    pub(in crate::db::executor) rows_scanned: usize,
238    pub(in crate::db::executor) optimization: ExecutionOptimization,
239}
240
241///
242/// LoadExecutor
243///
244/// Load-plan executor with canonical post-access semantics.
245/// Coordinates fast paths, trace hooks, and pagination cursors.
246///
247
248#[derive(Clone)]
249pub(crate) struct LoadExecutor<E: EntityKind> {
250    db: Db<E::Canister>,
251    debug: bool,
252    _marker: PhantomData<E>,
253}
254
255///
256/// GroupedRouteStage
257///
258/// Route-planning stage payload for grouped execution.
259/// Owns grouped handoff extraction, grouped route contracts, and grouped
260/// execution metadata before runtime stream resolution starts.
261///
262
263struct GroupedRouteStage<E: EntityKind + EntityValue> {
264    plan: AccessPlannedQuery<E::Key>,
265    cursor: GroupedPlannedCursor,
266    direction: Direction,
267    continuation_signature: ContinuationSignature,
268    index_prefix_specs: Vec<crate::db::access::LoweredIndexPrefixSpec>,
269    index_range_specs: Vec<crate::db::access::LoweredIndexRangeSpec>,
270    grouped_execution: crate::db::query::plan::GroupedExecutionConfig,
271    group_fields: Vec<crate::db::query::plan::FieldSlot>,
272    grouped_aggregates: Vec<GroupAggregateSpec>,
273    grouped_having: Option<GroupHavingSpec>,
274    grouped_route_plan: crate::db::executor::ExecutionPlan,
275    grouped_plan_metrics_strategy: GroupedPlanMetricsStrategy,
276    global_distinct_field_aggregate: Option<(AggregateKind, String)>,
277    execution_trace: Option<ExecutionTrace>,
278}
279
280///
281/// GroupedStreamStage
282///
283/// Stream-construction stage payload for grouped execution.
284/// Owns recovered context, execution preparation, and resolved grouped key
285/// stream for fold-phase consumption.
286///
287
288struct GroupedStreamStage<'a, E: EntityKind + EntityValue> {
289    ctx: Context<'a, E>,
290    execution_preparation: ExecutionPreparation,
291    resolved: ResolvedExecutionKeyStream,
292}
293
294///
295/// GroupedFoldStage
296///
297/// Fold-phase output payload for grouped execution.
298/// Owns grouped page materialization plus observability counters consumed by
299/// the final output stage.
300///
301
302struct GroupedFoldStage {
303    page: GroupedCursorPage,
304    filtered_rows: usize,
305    check_filtered_rows_upper_bound: bool,
306    rows_scanned: usize,
307    optimization: Option<ExecutionOptimization>,
308    index_predicate_applied: bool,
309    index_predicate_keys_rejected: u64,
310    distinct_keys_deduped: u64,
311}
312
313impl<E> LoadExecutor<E>
314where
315    E: EntityKind + EntityValue,
316{
317    /// Construct one load executor bound to a database handle and debug mode.
318    #[must_use]
319    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
320        Self {
321            db,
322            debug,
323            _marker: PhantomData,
324        }
325    }
326
327    /// Recover one canonical read context for kernel-owned execution setup.
328    pub(in crate::db::executor) fn recovered_context(
329        &self,
330    ) -> Result<crate::db::Context<'_, E>, InternalError> {
331        self.db.recovered_context::<E>()
332    }
333
334    // Resolve one orderable aggregate target field into a stable slot with
335    // canonical field-error taxonomy mapping.
336    pub(in crate::db::executor) fn resolve_orderable_field_slot(
337        target_field: &str,
338    ) -> Result<FieldSlot, InternalError> {
339        resolve_orderable_aggregate_target_slot::<E>(target_field)
340            .map_err(AggregateFieldValueError::into_internal_error)
341    }
342
343    // Resolve one aggregate target field into a stable slot with canonical
344    // field-error taxonomy mapping.
345    pub(in crate::db::executor) fn resolve_any_field_slot(
346        target_field: &str,
347    ) -> Result<FieldSlot, InternalError> {
348        resolve_any_aggregate_target_slot::<E>(target_field)
349            .map_err(AggregateFieldValueError::into_internal_error)
350    }
351
352    // Resolve one numeric aggregate target field into a stable slot with
353    // canonical field-error taxonomy mapping.
354    pub(in crate::db::executor) fn resolve_numeric_field_slot(
355        target_field: &str,
356    ) -> Result<FieldSlot, InternalError> {
357        resolve_numeric_aggregate_target_slot::<E>(target_field)
358            .map_err(AggregateFieldValueError::into_internal_error)
359    }
360
361    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
362        self.execute_paged_with_cursor(plan, PlannedCursor::none())
363            .map(|page| page.items)
364    }
365
366    pub(in crate::db) fn execute_paged_with_cursor(
367        &self,
368        plan: ExecutablePlan<E>,
369        cursor: impl Into<PlannedCursor>,
370    ) -> Result<CursorPage<E>, InternalError> {
371        self.execute_paged_with_cursor_traced(plan, cursor)
372            .map(|(page, _)| page)
373    }
374
375    pub(in crate::db) fn execute_paged_with_cursor_traced(
376        &self,
377        plan: ExecutablePlan<E>,
378        cursor: impl Into<PlannedCursor>,
379    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
380        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
381            return Err(InternalError::query_executor_invariant(
382                "grouped plans require execute_grouped pagination entrypoints",
383            ));
384        }
385
386        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
387        let cursor_boundary = cursor.boundary().cloned();
388        let index_range_token = cursor
389            .index_range_anchor()
390            .map(range_token_from_cursor_anchor);
391
392        if !plan.mode().is_load() {
393            return Err(InternalError::query_executor_invariant(
394                "load executor requires load plans",
395            ));
396        }
397
398        let continuation_signature = plan.continuation_signature();
399        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
400        let index_range_specs = plan.index_range_specs()?.to_vec();
401        let route_plan = Self::build_execution_route_plan_for_load(
402            plan.as_inner(),
403            cursor_boundary.as_ref(),
404            index_range_token.as_ref(),
405            None,
406        )?;
407        let continuation_applied = !matches!(
408            route_plan.continuation_mode(),
409            crate::db::executor::route::ContinuationMode::Initial
410        );
411        let direction = route_plan.direction();
412        debug_assert_eq!(
413            route_plan.window().effective_offset,
414            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
415            "route window effective offset must match logical plan offset semantics",
416        );
417        let mut execution_trace = self
418            .debug
419            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
420        let plan = plan.into_inner();
421        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
422
423        let result = (|| {
424            let mut span = Span::<E>::new(ExecKind::Load);
425
426            validate_executor_plan::<E>(&plan)?;
427            let ctx = self.db.recovered_context::<E>()?;
428            let execution_inputs = ExecutionInputs {
429                ctx: &ctx,
430                plan: &plan,
431                stream_bindings: AccessStreamBindings {
432                    index_prefix_specs: index_prefix_specs.as_slice(),
433                    index_range_specs: index_range_specs.as_slice(),
434                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
435                    direction,
436                },
437                execution_preparation: &execution_preparation,
438            };
439
440            record_plan_metrics(&plan.access);
441            // Plan execution routing once, then execute in canonical order.
442            // Resolve one canonical key stream, then run shared page materialization/finalization.
443            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
444                &execution_inputs,
445                &route_plan,
446                cursor_boundary.as_ref(),
447                continuation_signature,
448                IndexCompilePolicy::ConservativeSubset,
449            )?;
450            let page = materialized.page;
451            let rows_scanned = materialized.rows_scanned;
452            let post_access_rows = materialized.post_access_rows;
453            let optimization = materialized.optimization;
454            let index_predicate_applied = materialized.index_predicate_applied;
455            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
456            let distinct_keys_deduped = materialized.distinct_keys_deduped;
457
458            Ok(Self::finalize_execution(
459                page,
460                optimization,
461                rows_scanned,
462                post_access_rows,
463                index_predicate_applied,
464                index_predicate_keys_rejected,
465                distinct_keys_deduped,
466                &mut span,
467                &mut execution_trace,
468            ))
469        })();
470
471        result.map(|page| (page, execution_trace))
472    }
473
474    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
475        &self,
476        plan: ExecutablePlan<E>,
477        cursor: impl Into<GroupedPlannedCursor>,
478    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
479        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
480            return Err(InternalError::query_executor_invariant(
481                "grouped execution requires grouped logical plans",
482            ));
483        }
484
485        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
486
487        self.execute_grouped_path(plan, cursor)
488    }
489
490    // Grouped execution spine:
491    // 1) resolve grouped route/metadata
492    // 2) build grouped key stream
493    // 3) execute grouped fold
494    // 4) finalize grouped output + observability
495    fn execute_grouped_path(
496        &self,
497        plan: ExecutablePlan<E>,
498        cursor: GroupedPlannedCursor,
499    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
500        let route = Self::resolve_grouped_route(plan, cursor, self.debug)?;
501        let stream = self.build_grouped_stream(&route)?;
502        let folded = Self::execute_group_fold(&route, stream)?;
503
504        Ok(Self::finalize_grouped_output(route, folded))
505    }
506
507    // Resolve grouped handoff/route metadata into one grouped route-stage payload.
508    fn resolve_grouped_route(
509        plan: ExecutablePlan<E>,
510        cursor: GroupedPlannedCursor,
511        debug: bool,
512    ) -> Result<GroupedRouteStage<E>, InternalError> {
513        validate_executor_plan::<E>(plan.as_inner())?;
514        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
515        let grouped_execution = grouped_handoff.execution();
516        let group_fields = grouped_handoff.group_fields().to_vec();
517        let grouped_aggregates = grouped_handoff.aggregates().to_vec();
518        let grouped_having = grouped_handoff.having().cloned();
519        let grouped_route_plan =
520            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
521        let grouped_route_observability =
522            grouped_route_plan.grouped_observability().ok_or_else(|| {
523                InternalError::query_executor_invariant(
524                    "grouped route planning must emit grouped observability payload",
525                )
526            })?;
527        let grouped_route_outcome = grouped_route_observability.outcome();
528        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
529        let grouped_route_eligible = grouped_route_observability.eligible();
530        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
531        let grouped_plan_metrics_strategy =
532            match grouped_route_observability.grouped_execution_strategy() {
533                crate::db::executor::route::GroupedExecutionStrategy::HashMaterialized => {
534                    GroupedPlanMetricsStrategy::HashMaterialized
535                }
536                crate::db::executor::route::GroupedExecutionStrategy::OrderedMaterialized => {
537                    GroupedPlanMetricsStrategy::OrderedMaterialized
538                }
539            };
540        debug_assert!(
541            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
542            "grouped route eligibility and rejection reason must stay aligned",
543        );
544        debug_assert!(
545            grouped_route_outcome
546                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
547                || grouped_route_rejection_reason.is_some(),
548            "grouped rejected outcomes must carry a rejection reason",
549        );
550        debug_assert!(
551            matches!(
552                grouped_route_execution_mode,
553                crate::db::executor::route::ExecutionMode::Materialized
554            ),
555            "grouped execution route must remain blocking/materialized",
556        );
557
558        let direction = grouped_route_plan.direction();
559        let continuation_applied = !cursor.is_empty();
560        let execution_trace =
561            debug.then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
562        let continuation_signature = plan.continuation_signature();
563        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
564        let index_range_specs = plan.index_range_specs()?.to_vec();
565        let global_distinct_field_aggregate = Self::global_distinct_field_aggregate_spec(
566            group_fields.as_slice(),
567            grouped_aggregates.as_slice(),
568            grouped_having.as_ref(),
569        )?;
570        let plan = plan.into_inner();
571
572        Ok(GroupedRouteStage {
573            plan,
574            cursor,
575            direction,
576            continuation_signature,
577            index_prefix_specs,
578            index_range_specs,
579            grouped_execution,
580            group_fields,
581            grouped_aggregates,
582            grouped_having,
583            grouped_route_plan,
584            grouped_plan_metrics_strategy,
585            global_distinct_field_aggregate,
586            execution_trace,
587        })
588    }
589
590    // Build one grouped key stream from route-owned grouped execution metadata.
591    fn build_grouped_stream<'a>(
592        &'a self,
593        route: &'a GroupedRouteStage<E>,
594    ) -> Result<GroupedStreamStage<'a, E>, InternalError> {
595        let execution_preparation = ExecutionPreparation::for_plan::<E>(&route.plan);
596        let ctx = self.db.recovered_context::<E>()?;
597        let execution_inputs = ExecutionInputs {
598            ctx: &ctx,
599            plan: &route.plan,
600            stream_bindings: AccessStreamBindings {
601                index_prefix_specs: route.index_prefix_specs.as_slice(),
602                index_range_specs: route.index_range_specs.as_slice(),
603                index_range_anchor: None,
604                direction: route.direction,
605            },
606            execution_preparation: &execution_preparation,
607        };
608        record_grouped_plan_metrics(&route.plan.access, route.grouped_plan_metrics_strategy);
609        let resolved = Self::resolve_execution_key_stream_without_distinct(
610            &execution_inputs,
611            &route.grouped_route_plan,
612            IndexCompilePolicy::ConservativeSubset,
613        )?;
614
615        Ok(GroupedStreamStage {
616            ctx,
617            execution_preparation,
618            resolved,
619        })
620    }
621
622    // Execute grouped folding over one resolved grouped key stream.
623    #[expect(clippy::too_many_lines)]
624    fn execute_group_fold(
625        route: &GroupedRouteStage<E>,
626        mut stream: GroupedStreamStage<'_, E>,
627    ) -> Result<GroupedFoldStage, InternalError> {
628        let mut grouped_execution_context =
629            grouped_execution_context_from_planner_config(Some(route.grouped_execution));
630        let max_groups_bound =
631            usize::try_from(grouped_execution_context.config().max_groups()).unwrap_or(usize::MAX);
632        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
633        debug_assert!(
634            grouped_budget.max_groups() >= grouped_budget.groups()
635                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
636                && grouped_execution_context
637                    .config()
638                    .max_distinct_values_total()
639                    >= grouped_budget.distinct_values()
640                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
641            "grouped budget observability invariants must hold at grouped route entry",
642        );
643
644        let (mut grouped_engines, mut short_circuit_keys) =
645            if route.global_distinct_field_aggregate.is_none() {
646                let grouped_engines = route
647                .grouped_aggregates
648                .iter()
649                .map(|aggregate| {
650                    if aggregate.target_field().is_some() {
651                        return Err(InternalError::query_executor_invariant(format!(
652                            "grouped field-target aggregate reached executor after planning: {:?}",
653                            aggregate.kind()
654                        )));
655                    }
656
657                    Ok(grouped_execution_context.create_grouped_engine::<E>(
658                        aggregate.kind(),
659                        aggregate_materialized_fold_direction(aggregate.kind()),
660                        aggregate.distinct(),
661                    ))
662                })
663                .collect::<Result<Vec<_>, _>>()?;
664                let short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
665
666                (grouped_engines, short_circuit_keys)
667            } else {
668                (Vec::new(), Vec::new())
669            };
670        let mut scanned_rows = 0usize;
671        let mut filtered_rows = 0usize;
672        let compiled_predicate = stream.execution_preparation.compiled_predicate();
673
674        if let Some((aggregate_kind, target_field)) = route.global_distinct_field_aggregate.as_ref()
675        {
676            if !route.cursor.is_empty() {
677                return Err(InternalError::from_cursor_plan_error(
678                    crate::db::cursor::CursorPlanError::invalid_continuation_cursor_payload(
679                        "global DISTINCT grouped aggregates do not support continuation cursors",
680                    ),
681                ));
682            }
683
684            let global_row = Self::execute_global_distinct_field_aggregate(
685                &route.plan,
686                &stream.ctx,
687                &mut stream.resolved,
688                compiled_predicate,
689                &mut grouped_execution_context,
690                (*aggregate_kind, target_field.as_str()),
691                (&mut scanned_rows, &mut filtered_rows),
692            )?;
693            let page_rows = Self::page_global_distinct_grouped_row(
694                global_row,
695                route.plan.scalar_plan().page.as_ref(),
696            );
697            let rows_scanned = stream
698                .resolved
699                .rows_scanned_override
700                .unwrap_or(scanned_rows);
701            let optimization = stream.resolved.optimization;
702            let index_predicate_applied = stream.resolved.index_predicate_applied;
703            let index_predicate_keys_rejected = stream.resolved.index_predicate_keys_rejected;
704            let distinct_keys_deduped = stream
705                .resolved
706                .distinct_keys_deduped_counter
707                .as_ref()
708                .map_or(0, |counter| counter.get());
709
710            return Ok(GroupedFoldStage {
711                page: GroupedCursorPage {
712                    rows: page_rows,
713                    next_cursor: None,
714                },
715                filtered_rows,
716                check_filtered_rows_upper_bound: false,
717                rows_scanned,
718                optimization,
719                index_predicate_applied,
720                index_predicate_keys_rejected,
721                distinct_keys_deduped,
722            });
723        }
724
725        // Phase 1: stream key->row reads, decode, predicate filtering, and grouped folding.
726        while let Some(key) = stream.resolved.key_stream.next_key()? {
727            let row = match route.plan.scalar_plan().consistency {
728                MissingRowPolicy::Error => stream.ctx.read_strict(&key),
729                MissingRowPolicy::Ignore => stream.ctx.read(&key),
730            };
731            let row = match row {
732                Ok(row) => row,
733                Err(err) if err.is_not_found() => continue,
734                Err(err) => return Err(err),
735            };
736            scanned_rows = scanned_rows.saturating_add(1);
737            let (id, entity) = Context::<E>::deserialize_row((key, row))?;
738            if let Some(compiled_predicate) = compiled_predicate
739                && !compiled_predicate.eval(&entity)
740            {
741                continue;
742            }
743            filtered_rows = filtered_rows.saturating_add(1);
744
745            let group_values = route
746                .group_fields
747                .iter()
748                .map(|field| {
749                    entity.get_value_by_index(field.index()).ok_or_else(|| {
750                        InternalError::query_executor_invariant(format!(
751                            "grouped field slot missing on entity: index={}",
752                            field.index()
753                        ))
754                    })
755                })
756                .collect::<Result<Vec<_>, _>>()?;
757            let group_key = Value::List(group_values)
758                .canonical_key()
759                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
760            let canonical_group_value = group_key.canonical_value().clone();
761            let data_key = DataKey::try_new::<E>(id.key())?;
762
763            for (index, engine) in grouped_engines.iter_mut().enumerate() {
764                if short_circuit_keys[index].iter().any(|done| {
765                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
766                }) {
767                    continue;
768                }
769
770                let fold_control = engine
771                    .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
772                    .map_err(Self::map_group_error)?;
773                if matches!(fold_control, FoldControl::Break) {
774                    short_circuit_keys[index].push(canonical_group_value.clone());
775                    debug_assert!(
776                        short_circuit_keys[index].len() <= max_groups_bound,
777                        "grouped short-circuit key tracking must stay bounded by max_groups",
778                    );
779                }
780            }
781        }
782
783        // Phase 2: finalize grouped aggregates per terminal and iterate groups in lock-step.
784        //
785        // This avoids constructing one additional full grouped `(key, aggregates)` buffer
786        // prior to pagination; we page directly while walking finalized grouped outputs.
787        let aggregate_count = grouped_engines.len();
788        if aggregate_count == 0 {
789            return Err(InternalError::query_executor_invariant(
790                "grouped execution requires at least one aggregate terminal",
791            ));
792        }
793        let mut finalized_iters = grouped_engines
794            .into_iter()
795            .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
796            .collect::<Result<Vec<_>, _>>()?;
797        let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
798            InternalError::query_executor_invariant("missing grouped primary iterator")
799        })?;
800
801        // Phase 3: apply grouped resume/offset/limit while finalizing grouped outputs.
802        let initial_offset = route
803            .plan
804            .scalar_plan()
805            .page
806            .as_ref()
807            .map_or(0, |page| page.offset);
808        let resume_initial_offset = if route.cursor.is_empty() {
809            initial_offset
810        } else {
811            route.cursor.initial_offset()
812        };
813        let resume_boundary = route
814            .cursor
815            .last_group_key()
816            .map(|last_group_key| Value::List(last_group_key.to_vec()));
817        let apply_initial_offset = route.cursor.is_empty();
818        let limit = route
819            .plan
820            .scalar_plan()
821            .page
822            .as_ref()
823            .and_then(|page| page.limit)
824            .and_then(|limit| usize::try_from(limit).ok());
825        let initial_offset_for_page = if apply_initial_offset {
826            usize::try_from(initial_offset).unwrap_or(usize::MAX)
827        } else {
828            0
829        };
830        let selection_bound = limit.and_then(|limit| {
831            limit
832                .checked_add(initial_offset_for_page)
833                .and_then(|count| count.checked_add(1))
834        });
835        let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
836        if limit.is_none_or(|limit| limit != 0) {
837            for primary_output in primary_iter.by_ref() {
838                let group_key_value = primary_output.group_key().canonical_value().clone();
839                let mut aggregate_values = Vec::with_capacity(aggregate_count);
840                aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
841                for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
842                    let sibling_output = sibling_iter.next().ok_or_else(|| {
843                        InternalError::query_executor_invariant(format!(
844                            "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
845                        ))
846                    })?;
847                    let sibling_group_key = sibling_output.group_key().canonical_value();
848                    if canonical_value_compare(sibling_group_key, &group_key_value)
849                        != Ordering::Equal
850                    {
851                        return Err(InternalError::query_executor_invariant(format!(
852                            "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
853                        )));
854                    }
855                    aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
856                }
857                debug_assert_eq!(
858                    aggregate_values.len(),
859                    aggregate_count,
860                    "grouped aggregate value alignment must preserve declared aggregate count",
861                );
862                if let Some(grouped_having) = route.grouped_having.as_ref()
863                    && !Self::group_matches_having(
864                        grouped_having,
865                        route.group_fields.as_slice(),
866                        &group_key_value,
867                        aggregate_values.as_slice(),
868                    )?
869                {
870                    continue;
871                }
872
873                if let Some(resume_boundary) = resume_boundary.as_ref()
874                    && canonical_value_compare(&group_key_value, resume_boundary)
875                        != Ordering::Greater
876                {
877                    continue;
878                }
879
880                // Keep only the smallest `offset + limit + 1` canonical grouped keys when
881                // paging is bounded so grouped LIMIT does not require one full grouped buffer.
882                if let Some(selection_bound) = selection_bound {
883                    match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
884                        canonical_value_compare(existing_key, &group_key_value)
885                    }) {
886                        Ok(_) => {
887                            return Err(InternalError::query_executor_invariant(format!(
888                                "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
889                            )));
890                        }
891                        Err(insert_index) => {
892                            grouped_candidate_rows
893                                .insert(insert_index, (group_key_value, aggregate_values));
894                            if grouped_candidate_rows.len() > selection_bound {
895                                let _ = grouped_candidate_rows.pop();
896                            }
897                            debug_assert!(
898                                grouped_candidate_rows.len() <= selection_bound,
899                                "bounded grouped candidate rows must stay <= selection_bound",
900                            );
901                        }
902                    }
903                } else {
904                    grouped_candidate_rows.push((group_key_value, aggregate_values));
905                    debug_assert!(
906                        grouped_candidate_rows.len() <= max_groups_bound,
907                        "grouped candidate rows must stay bounded by max_groups",
908                    );
909                }
910            }
911            for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
912                if sibling_iter.next().is_some() {
913                    return Err(InternalError::query_executor_invariant(format!(
914                        "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
915                    )));
916                }
917            }
918            if selection_bound.is_none() {
919                grouped_candidate_rows
920                    .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
921            }
922        }
923        if let Some(selection_bound) = selection_bound {
924            debug_assert!(
925                grouped_candidate_rows.len() <= selection_bound,
926                "grouped candidate rows must remain bounded by selection_bound",
927            );
928        } else {
929            debug_assert!(
930                grouped_candidate_rows.len() <= max_groups_bound,
931                "grouped candidate rows must remain bounded by max_groups",
932            );
933        }
934
935        let mut page_rows = Vec::<GroupedRow>::new();
936        let mut last_emitted_group_key: Option<Vec<Value>> = None;
937        let mut has_more = false;
938        let mut groups_skipped_for_offset = 0usize;
939        for (group_key_value, aggregate_values) in grouped_candidate_rows {
940            if groups_skipped_for_offset < initial_offset_for_page {
941                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
942                continue;
943            }
944            if let Some(limit) = limit
945                && page_rows.len() >= limit
946            {
947                has_more = true;
948                break;
949            }
950
951            let emitted_group_key = match group_key_value {
952                Value::List(values) => values,
953                value => {
954                    return Err(InternalError::query_executor_invariant(format!(
955                        "grouped canonical key must be Value::List, found {value:?}"
956                    )));
957                }
958            };
959            last_emitted_group_key = Some(emitted_group_key.clone());
960            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
961            debug_assert!(
962                limit.is_none_or(|bounded_limit| page_rows.len() <= bounded_limit),
963                "grouped page rows must not exceed explicit page limit",
964            );
965        }
966
967        let next_cursor = if has_more {
968            last_emitted_group_key.map(|last_group_key| {
969                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
970                    route.continuation_signature,
971                    last_group_key,
972                    Direction::Asc,
973                    resume_initial_offset,
974                ))
975            })
976        } else {
977            None
978        };
979        let rows_scanned = stream
980            .resolved
981            .rows_scanned_override
982            .unwrap_or(scanned_rows);
983        let optimization = stream.resolved.optimization;
984        let index_predicate_applied = stream.resolved.index_predicate_applied;
985        let index_predicate_keys_rejected = stream.resolved.index_predicate_keys_rejected;
986        let distinct_keys_deduped = stream
987            .resolved
988            .distinct_keys_deduped_counter
989            .as_ref()
990            .map_or(0, |counter| counter.get());
991
992        Ok(GroupedFoldStage {
993            page: GroupedCursorPage {
994                rows: page_rows,
995                next_cursor,
996            },
997            filtered_rows,
998            check_filtered_rows_upper_bound: true,
999            rows_scanned,
1000            optimization,
1001            index_predicate_applied,
1002            index_predicate_keys_rejected,
1003            distinct_keys_deduped,
1004        })
1005    }
1006
1007    // Finalize grouped output payloads and observability after grouped fold execution.
1008    fn finalize_grouped_output(
1009        mut route: GroupedRouteStage<E>,
1010        folded: GroupedFoldStage,
1011    ) -> (GroupedCursorPage, Option<ExecutionTrace>) {
1012        let rows_returned = folded.page.rows.len();
1013        Self::finalize_path_outcome(
1014            &mut route.execution_trace,
1015            folded.optimization,
1016            folded.rows_scanned,
1017            rows_returned,
1018            folded.index_predicate_applied,
1019            folded.index_predicate_keys_rejected,
1020            folded.distinct_keys_deduped,
1021        );
1022
1023        let mut span = Span::<E>::new(ExecKind::Load);
1024        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
1025        if folded.check_filtered_rows_upper_bound {
1026            debug_assert!(
1027                folded.filtered_rows >= rows_returned,
1028                "grouped pagination must return at most filtered row cardinality",
1029            );
1030        }
1031
1032        (folded.page, route.execution_trace)
1033    }
1034
1035    // Map grouped reducer errors into executor-owned error classes.
1036    fn map_group_error(err: GroupError) -> InternalError {
1037        match err {
1038            GroupError::MemoryLimitExceeded { .. } | GroupError::DistinctBudgetExceeded { .. } => {
1039                InternalError::executor_internal(err.to_string())
1040            }
1041            GroupError::Internal(inner) => inner,
1042        }
1043    }
1044
1045    // Resolve whether this grouped shape is the supported global DISTINCT
1046    // field-target aggregate contract (`COUNT` or `SUM` with zero group keys).
1047    fn global_distinct_field_aggregate_spec(
1048        group_fields: &[crate::db::query::plan::FieldSlot],
1049        aggregates: &[GroupAggregateSpec],
1050        having: Option<&GroupHavingSpec>,
1051    ) -> Result<Option<(AggregateKind, String)>, InternalError> {
1052        if !group_fields.is_empty() {
1053            return Ok(None);
1054        }
1055        if aggregates.is_empty() {
1056            return Ok(None);
1057        }
1058        if aggregates
1059            .iter()
1060            .all(|aggregate| aggregate.target_field().is_none())
1061        {
1062            return Ok(None);
1063        }
1064        if having.is_some() {
1065            return Err(InternalError::query_executor_invariant(
1066                "global DISTINCT grouped aggregate shape does not support HAVING",
1067            ));
1068        }
1069        if aggregates.len() != 1 {
1070            return Err(InternalError::query_executor_invariant(
1071                "global DISTINCT grouped aggregate shape requires exactly one aggregate terminal",
1072            ));
1073        }
1074
1075        let aggregate = &aggregates[0];
1076        let Some(target_field) = aggregate.target_field() else {
1077            return Err(InternalError::query_executor_invariant(
1078                "global DISTINCT grouped aggregate shape requires field-target aggregate",
1079            ));
1080        };
1081        if !aggregate.distinct() {
1082            return Err(InternalError::query_executor_invariant(
1083                "global DISTINCT grouped aggregate shape requires DISTINCT aggregate terminal",
1084            ));
1085        }
1086        if !matches!(aggregate.kind(), AggregateKind::Count | AggregateKind::Sum) {
1087            return Err(InternalError::query_executor_invariant(format!(
1088                "unsupported global DISTINCT grouped aggregate kind: {:?}",
1089                aggregate.kind()
1090            )));
1091        }
1092
1093        Ok(Some((aggregate.kind(), target_field.to_string())))
1094    }
1095
1096    // Execute one global DISTINCT field-target grouped aggregate with grouped
1097    // distinct budget accounting and deterministic reducer behavior.
1098    fn execute_global_distinct_field_aggregate(
1099        plan: &AccessPlannedQuery<E::Key>,
1100        ctx: &Context<'_, E>,
1101        resolved: &mut ResolvedExecutionKeyStream,
1102        compiled_predicate: Option<&crate::db::predicate::PredicateProgram>,
1103        grouped_execution_context: &mut crate::db::executor::aggregate::ExecutionContext,
1104        aggregate_spec: (AggregateKind, &str),
1105        row_counters: (&mut usize, &mut usize),
1106    ) -> Result<GroupedRow, InternalError> {
1107        let (aggregate_kind, target_field) = aggregate_spec;
1108        let (scanned_rows, filtered_rows) = row_counters;
1109        let field_slot = if aggregate_kind.is_sum() {
1110            Self::resolve_numeric_field_slot(target_field)?
1111        } else {
1112            Self::resolve_any_field_slot(target_field)?
1113        };
1114        let mut distinct_values = GroupKeySet::new();
1115        let mut count = 0u32;
1116        let mut sum = Decimal::ZERO;
1117        let mut saw_sum_value = false;
1118
1119        grouped_execution_context
1120            .record_implicit_single_group::<E>()
1121            .map_err(Self::map_group_error)?;
1122
1123        while let Some(key) = resolved.key_stream.next_key()? {
1124            let row = match plan.scalar_plan().consistency {
1125                MissingRowPolicy::Error => ctx.read_strict(&key),
1126                MissingRowPolicy::Ignore => ctx.read(&key),
1127            };
1128            let row = match row {
1129                Ok(row) => row,
1130                Err(err) if err.is_not_found() => continue,
1131                Err(err) => return Err(err),
1132            };
1133            *scanned_rows = scanned_rows.saturating_add(1);
1134            let (_, entity) = Context::<E>::deserialize_row((key, row))?;
1135            if let Some(compiled_predicate) = compiled_predicate
1136                && !compiled_predicate.eval(&entity)
1137            {
1138                continue;
1139            }
1140            *filtered_rows = filtered_rows.saturating_add(1);
1141
1142            let distinct_value = extract_orderable_field_value(&entity, target_field, field_slot)
1143                .map_err(AggregateFieldValueError::into_internal_error)?;
1144            let distinct_key = distinct_value
1145                .canonical_key()
1146                .map_err(KeyCanonicalError::into_internal_error)?;
1147            let distinct_admitted = grouped_execution_context
1148                .admit_distinct_key(
1149                    &mut distinct_values,
1150                    grouped_execution_context
1151                        .config()
1152                        .max_distinct_values_per_group(),
1153                    distinct_key,
1154                )
1155                .map_err(Self::map_group_error)?;
1156            if !distinct_admitted {
1157                continue;
1158            }
1159
1160            if aggregate_kind.is_sum() {
1161                let numeric_value =
1162                    extract_numeric_field_decimal(&entity, target_field, field_slot)
1163                        .map_err(AggregateFieldValueError::into_internal_error)?;
1164                sum += numeric_value;
1165                saw_sum_value = true;
1166            } else {
1167                count = count.saturating_add(1);
1168            }
1169        }
1170
1171        let aggregate_value = if aggregate_kind.is_sum() {
1172            if saw_sum_value {
1173                Value::Decimal(sum)
1174            } else {
1175                Value::Null
1176            }
1177        } else {
1178            Value::Uint(u64::from(count))
1179        };
1180
1181        Ok(GroupedRow::new(Vec::new(), vec![aggregate_value]))
1182    }
1183
1184    // Apply grouped pagination semantics to the singleton global grouped row.
1185    fn page_global_distinct_grouped_row(
1186        row: GroupedRow,
1187        page: Option<&crate::db::query::plan::PageSpec>,
1188    ) -> Vec<GroupedRow> {
1189        let Some(page) = page else {
1190            return vec![row];
1191        };
1192        if page.offset > 0 || page.limit == Some(0) {
1193            return Vec::new();
1194        }
1195
1196        vec![row]
1197    }
1198
1199    // Convert one aggregate output payload into grouped response value payload.
1200    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
1201        match output {
1202            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
1203            AggregateOutput::Sum(value) => value.map_or(Value::Null, Value::Decimal),
1204            AggregateOutput::Exists(value) => Value::Bool(*value),
1205            AggregateOutput::Min(value)
1206            | AggregateOutput::Max(value)
1207            | AggregateOutput::First(value)
1208            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
1209        }
1210    }
1211
1212    // Evaluate grouped HAVING clauses on one finalized grouped output row.
1213    fn group_matches_having(
1214        having: &GroupHavingSpec,
1215        group_fields: &[crate::db::query::plan::FieldSlot],
1216        group_key_value: &Value,
1217        aggregate_values: &[Value],
1218    ) -> Result<bool, InternalError> {
1219        for (index, clause) in having.clauses().iter().enumerate() {
1220            let actual = match clause.symbol() {
1221                GroupHavingSymbol::GroupField(field_slot) => {
1222                    let group_key_list = match group_key_value {
1223                        Value::List(values) => values,
1224                        value => {
1225                            return Err(InternalError::query_executor_invariant(format!(
1226                                "grouped HAVING requires list-shaped grouped keys, found {value:?}"
1227                            )));
1228                        }
1229                    };
1230                    let Some(group_field_offset) = group_fields
1231                        .iter()
1232                        .position(|group_field| group_field.index() == field_slot.index())
1233                    else {
1234                        return Err(InternalError::query_executor_invariant(format!(
1235                            "grouped HAVING field is not in grouped key projection: field='{}'",
1236                            field_slot.field()
1237                        )));
1238                    };
1239                    group_key_list.get(group_field_offset).ok_or_else(|| {
1240                        InternalError::query_executor_invariant(format!(
1241                            "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
1242                            group_key_list.len()
1243                        ))
1244                    })?
1245                }
1246                GroupHavingSymbol::AggregateIndex(aggregate_index) => {
1247                    aggregate_values.get(*aggregate_index).ok_or_else(|| {
1248                        InternalError::query_executor_invariant(format!(
1249                            "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
1250                            aggregate_values.len()
1251                        ))
1252                    })?
1253                }
1254            };
1255
1256            if !Self::having_compare_values(actual, clause.op(), clause.value())? {
1257                return Ok(false);
1258            }
1259        }
1260
1261        Ok(true)
1262    }
1263
1264    // Evaluate one grouped HAVING compare operator using strict value semantics.
1265    fn having_compare_values(
1266        actual: &Value,
1267        op: CompareOp,
1268        expected: &Value,
1269    ) -> Result<bool, InternalError> {
1270        let strict = CoercionSpec::default();
1271        let matches = match op {
1272            CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
1273            CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
1274            CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
1275            CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
1276            CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
1277            CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
1278            CompareOp::In
1279            | CompareOp::NotIn
1280            | CompareOp::Contains
1281            | CompareOp::StartsWith
1282            | CompareOp::EndsWith => {
1283                return Err(InternalError::query_executor_invariant(format!(
1284                    "unsupported grouped HAVING operator reached executor: {op:?}"
1285                )));
1286            }
1287        };
1288
1289        Ok(matches)
1290    }
1291
1292    // Record shared observability outcome for any execution path.
1293    fn finalize_path_outcome(
1294        execution_trace: &mut Option<ExecutionTrace>,
1295        optimization: Option<ExecutionOptimization>,
1296        rows_scanned: usize,
1297        rows_returned: usize,
1298        index_predicate_applied: bool,
1299        index_predicate_keys_rejected: u64,
1300        distinct_keys_deduped: u64,
1301    ) {
1302        record_rows_scanned::<E>(rows_scanned);
1303        if let Some(execution_trace) = execution_trace.as_mut() {
1304            execution_trace.set_path_outcome(
1305                optimization,
1306                rows_scanned,
1307                rows_returned,
1308                index_predicate_applied,
1309                index_predicate_keys_rejected,
1310                distinct_keys_deduped,
1311            );
1312            debug_assert_eq!(
1313                execution_trace.keys_scanned,
1314                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
1315                "execution trace keys_scanned must match rows_scanned metrics input",
1316            );
1317        }
1318    }
1319
1320    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
1321    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
1322        plan: &AccessPlannedQuery<E::Key>,
1323        cursor_boundary: Option<&CursorBoundary>,
1324    ) -> Result<(), InternalError> {
1325        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
1326            return Ok(());
1327        }
1328        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
1329
1330        Ok(())
1331    }
1332}