Skip to main content

icydb_core/db/executor/load/
mod.rs

1//! Module: executor::load
2//! Responsibility: load-path execution orchestration, pagination, and trace contracts.
3//! Does not own: logical planning semantics or relation/commit mutation policy.
4//! Boundary: consumes executable load plans and delegates post-access semantics to kernel.
5
6mod execute;
7mod fast_stream;
8mod index_range_limit;
9mod page;
10mod pk_stream;
11mod secondary_index;
12mod terminal;
13mod trace;
14
15pub(in crate::db::executor) use self::execute::{
16    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
17};
18
19use self::trace::{access_path_variant, execution_order_direction};
20use crate::{
21    db::{
22        Context, Db, GroupedRow,
23        access::AccessPlan,
24        contracts::canonical_value_compare,
25        cursor::{
26            ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
27            PlannedCursor, decode_pk_cursor_boundary,
28        },
29        data::DataKey,
30        direction::Direction,
31        executor::{
32            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
33            KeyOrderComparator, OrderedKeyStreamBox,
34            aggregate::field::{
35                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
36                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
37            },
38            aggregate::{AggregateOutput, FoldControl, GroupError},
39            group::{
40                CanonicalKey, grouped_budget_observability,
41                grouped_execution_context_from_planner_config,
42            },
43            plan_metrics::{record_plan_metrics, record_rows_scanned},
44            range_token_anchor_key, range_token_from_cursor_anchor,
45            route::aggregate_materialized_fold_direction,
46            validate_executor_plan,
47        },
48        index::IndexCompilePolicy,
49        query::plan::{AccessPlannedQuery, LogicalPlan, OrderDirection, grouped_executor_handoff},
50        response::Response,
51    },
52    error::InternalError,
53    obs::sink::{ExecKind, Span},
54    traits::{EntityKind, EntityValue},
55    value::Value,
56};
57use std::{cmp::Ordering, marker::PhantomData};
58
59///
60/// PageCursor
61///
62/// Internal continuation cursor enum for scalar and grouped pagination.
63///
64#[derive(Clone, Debug, Eq, PartialEq)]
65pub(in crate::db) enum PageCursor {
66    Scalar(ContinuationToken),
67    Grouped(GroupedContinuationToken),
68}
69
70impl PageCursor {
71    /// Borrow scalar continuation token when this cursor is scalar-shaped.
72    #[must_use]
73    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
74        match self {
75            Self::Scalar(token) => Some(token),
76            Self::Grouped(_) => None,
77        }
78    }
79
80    /// Borrow grouped continuation token when this cursor is grouped-shaped.
81    #[must_use]
82    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
83        match self {
84            Self::Scalar(_) => None,
85            Self::Grouped(token) => Some(token),
86        }
87    }
88}
89
90impl From<ContinuationToken> for PageCursor {
91    fn from(value: ContinuationToken) -> Self {
92        Self::Scalar(value)
93    }
94}
95
96impl From<GroupedContinuationToken> for PageCursor {
97    fn from(value: GroupedContinuationToken) -> Self {
98        Self::Grouped(value)
99    }
100}
101
102///
103/// CursorPage
104///
105/// Internal load page result with continuation cursor payload.
106/// Returned by paged executor entrypoints.
107///
108
109#[derive(Debug)]
110pub(crate) struct CursorPage<E: EntityKind> {
111    pub(crate) items: Response<E>,
112
113    pub(crate) next_cursor: Option<PageCursor>,
114}
115
116///
117/// GroupedCursorPage
118///
119/// Internal grouped page result with grouped rows and continuation cursor payload.
120///
121#[derive(Debug)]
122pub(in crate::db) struct GroupedCursorPage {
123    pub(in crate::db) rows: Vec<GroupedRow>,
124    pub(in crate::db) next_cursor: Option<PageCursor>,
125}
126
127///
128/// ExecutionAccessPathVariant
129///
130/// Coarse access path shape used by the load execution trace surface.
131///
132
133#[derive(Clone, Copy, Debug, Eq, PartialEq)]
134pub enum ExecutionAccessPathVariant {
135    ByKey,
136    ByKeys,
137    KeyRange,
138    IndexPrefix,
139    IndexRange,
140    FullScan,
141    Union,
142    Intersection,
143}
144
145///
146/// ExecutionOptimization
147///
148/// Canonical load optimization selected by execution, if any.
149///
150
151#[derive(Clone, Copy, Debug, Eq, PartialEq)]
152pub enum ExecutionOptimization {
153    PrimaryKey,
154    SecondaryOrderPushdown,
155    IndexRangeLimitPushdown,
156}
157
158///
159/// ExecutionTrace
160///
161/// Structured, opt-in load execution introspection snapshot.
162/// Captures plan-shape and execution decisions without changing semantics.
163///
164
165#[derive(Clone, Copy, Debug, Eq, PartialEq)]
166pub struct ExecutionTrace {
167    pub access_path_variant: ExecutionAccessPathVariant,
168    pub direction: OrderDirection,
169    pub optimization: Option<ExecutionOptimization>,
170    pub keys_scanned: u64,
171    pub rows_returned: u64,
172    pub continuation_applied: bool,
173    pub index_predicate_applied: bool,
174    pub index_predicate_keys_rejected: u64,
175    pub distinct_keys_deduped: u64,
176}
177
178impl ExecutionTrace {
179    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
180        Self {
181            access_path_variant: access_path_variant(access),
182            direction: execution_order_direction(direction),
183            optimization: None,
184            keys_scanned: 0,
185            rows_returned: 0,
186            continuation_applied,
187            index_predicate_applied: false,
188            index_predicate_keys_rejected: 0,
189            distinct_keys_deduped: 0,
190        }
191    }
192
193    fn set_path_outcome(
194        &mut self,
195        optimization: Option<ExecutionOptimization>,
196        keys_scanned: usize,
197        rows_returned: usize,
198        index_predicate_applied: bool,
199        index_predicate_keys_rejected: u64,
200        distinct_keys_deduped: u64,
201    ) {
202        self.optimization = optimization;
203        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
204        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
205        self.index_predicate_applied = index_predicate_applied;
206        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
207        self.distinct_keys_deduped = distinct_keys_deduped;
208    }
209}
210
211/// Resolve key-stream comparator contract from runtime direction.
212pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
213    direction: Direction,
214) -> KeyOrderComparator {
215    KeyOrderComparator::from_direction(direction)
216}
217
218///
219/// FastPathKeyResult
220///
221/// Internal fast-path access result.
222/// Carries ordered keys plus observability metadata for shared execution phases.
223///
224
225pub(in crate::db::executor) struct FastPathKeyResult {
226    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
227    pub(in crate::db::executor) rows_scanned: usize,
228    pub(in crate::db::executor) optimization: ExecutionOptimization,
229}
230
231///
232/// LoadExecutor
233///
234/// Load-plan executor with canonical post-access semantics.
235/// Coordinates fast paths, trace hooks, and pagination cursors.
236///
237
238#[derive(Clone)]
239pub(crate) struct LoadExecutor<E: EntityKind> {
240    db: Db<E::Canister>,
241    debug: bool,
242    _marker: PhantomData<E>,
243}
244
245impl<E> LoadExecutor<E>
246where
247    E: EntityKind + EntityValue,
248{
249    /// Construct one load executor bound to a database handle and debug mode.
250    #[must_use]
251    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
252        Self {
253            db,
254            debug,
255            _marker: PhantomData,
256        }
257    }
258
259    /// Recover one canonical read context for kernel-owned execution setup.
260    pub(in crate::db::executor) fn recovered_context(
261        &self,
262    ) -> Result<crate::db::Context<'_, E>, InternalError> {
263        self.db.recovered_context::<E>()
264    }
265
266    // Resolve one orderable aggregate target field into a stable slot with
267    // canonical field-error taxonomy mapping.
268    pub(in crate::db::executor) fn resolve_orderable_field_slot(
269        target_field: &str,
270    ) -> Result<FieldSlot, InternalError> {
271        resolve_orderable_aggregate_target_slot::<E>(target_field)
272            .map_err(AggregateFieldValueError::into_internal_error)
273    }
274
275    // Resolve one aggregate target field into a stable slot with canonical
276    // field-error taxonomy mapping.
277    pub(in crate::db::executor) fn resolve_any_field_slot(
278        target_field: &str,
279    ) -> Result<FieldSlot, InternalError> {
280        resolve_any_aggregate_target_slot::<E>(target_field)
281            .map_err(AggregateFieldValueError::into_internal_error)
282    }
283
284    // Resolve one numeric aggregate target field into a stable slot with
285    // canonical field-error taxonomy mapping.
286    pub(in crate::db::executor) fn resolve_numeric_field_slot(
287        target_field: &str,
288    ) -> Result<FieldSlot, InternalError> {
289        resolve_numeric_aggregate_target_slot::<E>(target_field)
290            .map_err(AggregateFieldValueError::into_internal_error)
291    }
292
293    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
294        self.execute_paged_with_cursor(plan, PlannedCursor::none())
295            .map(|page| page.items)
296    }
297
298    pub(in crate::db) fn execute_paged_with_cursor(
299        &self,
300        plan: ExecutablePlan<E>,
301        cursor: impl Into<PlannedCursor>,
302    ) -> Result<CursorPage<E>, InternalError> {
303        self.execute_paged_with_cursor_traced(plan, cursor)
304            .map(|(page, _)| page)
305    }
306
307    pub(in crate::db) fn execute_paged_with_cursor_traced(
308        &self,
309        plan: ExecutablePlan<E>,
310        cursor: impl Into<PlannedCursor>,
311    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
312        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
313            return Err(InternalError::query_executor_invariant(
314                "grouped plans require execute_grouped pagination entrypoints",
315            ));
316        }
317
318        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
319        let cursor_boundary = cursor.boundary().cloned();
320        let index_range_token = cursor
321            .index_range_anchor()
322            .map(range_token_from_cursor_anchor);
323
324        if !plan.mode().is_load() {
325            return Err(InternalError::query_executor_invariant(
326                "load executor requires load plans",
327            ));
328        }
329
330        let continuation_signature = plan.continuation_signature();
331        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
332        let index_range_specs = plan.index_range_specs()?.to_vec();
333        let route_plan = Self::build_execution_route_plan_for_load(
334            plan.as_inner(),
335            cursor_boundary.as_ref(),
336            index_range_token.as_ref(),
337            None,
338        )?;
339        let continuation_applied = !matches!(
340            route_plan.continuation_mode(),
341            crate::db::executor::route::ContinuationMode::Initial
342        );
343        let direction = route_plan.direction();
344        debug_assert_eq!(
345            route_plan.window().effective_offset,
346            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
347            "route window effective offset must match logical plan offset semantics",
348        );
349        let mut execution_trace = self
350            .debug
351            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
352        let plan = plan.into_inner();
353        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
354
355        let result = (|| {
356            let mut span = Span::<E>::new(ExecKind::Load);
357
358            validate_executor_plan::<E>(&plan)?;
359            let ctx = self.db.recovered_context::<E>()?;
360            let execution_inputs = ExecutionInputs {
361                ctx: &ctx,
362                plan: &plan,
363                stream_bindings: AccessStreamBindings {
364                    index_prefix_specs: index_prefix_specs.as_slice(),
365                    index_range_specs: index_range_specs.as_slice(),
366                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
367                    direction,
368                },
369                execution_preparation: &execution_preparation,
370            };
371
372            record_plan_metrics(&plan.access);
373            // Plan execution routing once, then execute in canonical order.
374            // Resolve one canonical key stream, then run shared page materialization/finalization.
375            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
376                &execution_inputs,
377                &route_plan,
378                cursor_boundary.as_ref(),
379                continuation_signature,
380                IndexCompilePolicy::ConservativeSubset,
381            )?;
382            let page = materialized.page;
383            let rows_scanned = materialized.rows_scanned;
384            let post_access_rows = materialized.post_access_rows;
385            let optimization = materialized.optimization;
386            let index_predicate_applied = materialized.index_predicate_applied;
387            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
388            let distinct_keys_deduped = materialized.distinct_keys_deduped;
389
390            Ok(Self::finalize_execution(
391                page,
392                optimization,
393                rows_scanned,
394                post_access_rows,
395                index_predicate_applied,
396                index_predicate_keys_rejected,
397                distinct_keys_deduped,
398                &mut span,
399                &mut execution_trace,
400            ))
401        })();
402
403        result.map(|page| (page, execution_trace))
404    }
405
406    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
407        &self,
408        plan: ExecutablePlan<E>,
409        cursor: impl Into<GroupedPlannedCursor>,
410    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
411        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
412            return Err(InternalError::query_executor_invariant(
413                "grouped execution requires grouped logical plans",
414            ));
415        }
416
417        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
418
419        self.execute_grouped_path(plan, cursor)
420    }
421
422    // Execute grouped blocking reduction and produce grouped page rows + grouped cursor.
423    #[expect(clippy::too_many_lines)]
424    fn execute_grouped_path(
425        &self,
426        plan: ExecutablePlan<E>,
427        cursor: GroupedPlannedCursor,
428    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
429        validate_executor_plan::<E>(plan.as_inner())?;
430        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
431        let grouped_execution = grouped_handoff.execution();
432        let group_fields = grouped_handoff.group_fields().to_vec();
433        let grouped_route_plan =
434            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
435        let grouped_route_observability =
436            grouped_route_plan.grouped_observability().ok_or_else(|| {
437                InternalError::query_executor_invariant(
438                    "grouped route planning must emit grouped observability payload",
439                )
440            })?;
441        let direction = grouped_route_plan.direction();
442        let continuation_applied = !cursor.is_empty();
443        let mut execution_trace = self
444            .debug
445            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
446        let continuation_signature = plan.continuation_signature();
447        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
448        let index_range_specs = plan.index_range_specs()?.to_vec();
449
450        let mut grouped_execution_context =
451            grouped_execution_context_from_planner_config(Some(grouped_execution));
452        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
453        debug_assert!(
454            grouped_budget.max_groups() >= grouped_budget.groups()
455                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
456                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
457            "grouped budget observability invariants must hold at grouped route entry"
458        );
459
460        // Observe grouped route outcome/rejection once at grouped runtime entry.
461        let grouped_route_outcome = grouped_route_observability.outcome();
462        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
463        let grouped_route_eligible = grouped_route_observability.eligible();
464        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
465        debug_assert!(
466            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
467            "grouped route eligibility and rejection reason must stay aligned",
468        );
469        debug_assert!(
470            grouped_route_outcome
471                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
472                || grouped_route_rejection_reason.is_some(),
473            "grouped rejected outcomes must carry a rejection reason",
474        );
475        debug_assert!(
476            matches!(
477                grouped_route_execution_mode,
478                crate::db::executor::route::ExecutionMode::Materialized
479            ),
480            "grouped execution route must remain blocking/materialized",
481        );
482        let mut grouped_engines = grouped_handoff
483            .aggregates()
484            .iter()
485            .map(|aggregate| {
486                if aggregate.target_field().is_some() {
487                    return Err(InternalError::query_executor_invariant(format!(
488                        "grouped field-target aggregate reached executor after planning: {:?}",
489                        aggregate.kind()
490                    )));
491                }
492
493                Ok(grouped_execution_context.create_grouped_engine::<E>(
494                    aggregate.kind(),
495                    aggregate_materialized_fold_direction(aggregate.kind()),
496                ))
497            })
498            .collect::<Result<Vec<_>, _>>()?;
499        let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
500        let plan = plan.into_inner();
501        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
502
503        let mut span = Span::<E>::new(ExecKind::Load);
504        let ctx = self.db.recovered_context::<E>()?;
505        let execution_inputs = ExecutionInputs {
506            ctx: &ctx,
507            plan: &plan,
508            stream_bindings: AccessStreamBindings {
509                index_prefix_specs: index_prefix_specs.as_slice(),
510                index_range_specs: index_range_specs.as_slice(),
511                index_range_anchor: None,
512                direction,
513            },
514            execution_preparation: &execution_preparation,
515        };
516        record_plan_metrics(&plan.access);
517        let mut resolved = Self::resolve_execution_key_stream_without_distinct(
518            &execution_inputs,
519            &grouped_route_plan,
520            IndexCompilePolicy::ConservativeSubset,
521        )?;
522        let data_rows = ctx.rows_from_ordered_key_stream(
523            resolved.key_stream.as_mut(),
524            plan.scalar_plan().consistency,
525        )?;
526        let scanned_rows = data_rows.len();
527        let mut rows = Context::<E>::deserialize_rows(data_rows)?;
528        if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
529            rows.retain(|row| compiled_predicate.eval(&row.1));
530        }
531        let filtered_rows = rows.len();
532
533        // Phase 1: fold every filtered row into per-group aggregate states.
534        for (id, entity) in &rows {
535            let group_values = group_fields
536                .iter()
537                .map(|field| {
538                    entity.get_value_by_index(field.index()).ok_or_else(|| {
539                        InternalError::query_executor_invariant(format!(
540                            "grouped field slot missing on entity: index={}",
541                            field.index()
542                        ))
543                    })
544                })
545                .collect::<Result<Vec<_>, _>>()?;
546            let group_key = Value::List(group_values)
547                .canonical_key()
548                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
549            let canonical_group_value = group_key.canonical_value().clone();
550            let data_key = DataKey::try_new::<E>(id.key())?;
551
552            for (index, engine) in grouped_engines.iter_mut().enumerate() {
553                if short_circuit_keys[index].iter().any(|done| {
554                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
555                }) {
556                    continue;
557                }
558
559                let fold_control = engine
560                    .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
561                    .map_err(Self::map_group_error)?;
562                if matches!(fold_control, FoldControl::Break) {
563                    short_circuit_keys[index].push(canonical_group_value.clone());
564                }
565            }
566        }
567
568        // Phase 2: finalize grouped aggregate states and align outputs by declared aggregate order.
569        let aggregate_count = grouped_engines.len();
570        let mut grouped_rows_by_key = Vec::<(Value, Vec<Value>)>::new();
571        for (index, engine) in grouped_engines.into_iter().enumerate() {
572            let finalized = engine.finalize_grouped()?;
573            for output in finalized {
574                let group_key = output.group_key().canonical_value().clone();
575                let aggregate_value = Self::aggregate_output_to_value(output.output());
576                if let Some((_, existing_aggregates)) =
577                    grouped_rows_by_key
578                        .iter_mut()
579                        .find(|(existing_group_key, _)| {
580                            canonical_value_compare(existing_group_key, &group_key)
581                                == Ordering::Equal
582                        })
583                {
584                    if let Some(slot) = existing_aggregates.get_mut(index) {
585                        *slot = aggregate_value;
586                    }
587                } else {
588                    let mut aggregates = vec![Value::Null; aggregate_count];
589                    if let Some(slot) = aggregates.get_mut(index) {
590                        *slot = aggregate_value;
591                    }
592                    grouped_rows_by_key.push((group_key, aggregates));
593                }
594            }
595        }
596        grouped_rows_by_key.sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
597
598        // Phase 3: apply grouped resume/offset/limit and build grouped continuation token.
599        let initial_offset = plan
600            .scalar_plan()
601            .page
602            .as_ref()
603            .map_or(0, |page| page.offset);
604        let resume_initial_offset = if cursor.is_empty() {
605            initial_offset
606        } else {
607            cursor.initial_offset()
608        };
609        let resume_boundary = cursor
610            .last_group_key()
611            .map(|last_group_key| Value::List(last_group_key.to_vec()));
612        let apply_initial_offset = cursor.is_empty();
613        let mut groups_skipped_for_offset = 0u32;
614        let limit = plan
615            .scalar_plan()
616            .page
617            .as_ref()
618            .and_then(|page| page.limit)
619            .and_then(|limit| usize::try_from(limit).ok());
620        let mut page_rows = Vec::<GroupedRow>::new();
621        let mut last_emitted_group_key: Option<Vec<Value>> = None;
622        let mut has_more = false;
623        for (group_key_value, aggregate_values) in grouped_rows_by_key {
624            if let Some(resume_boundary) = resume_boundary.as_ref()
625                && canonical_value_compare(&group_key_value, resume_boundary) != Ordering::Greater
626            {
627                continue;
628            }
629            if apply_initial_offset && groups_skipped_for_offset < initial_offset {
630                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
631                continue;
632            }
633            if limit.is_some_and(|limit| limit == 0) {
634                break;
635            }
636            if let Some(limit) = limit
637                && page_rows.len() >= limit
638            {
639                has_more = true;
640                break;
641            }
642
643            let emitted_group_key = match group_key_value {
644                Value::List(values) => values,
645                value => {
646                    return Err(InternalError::query_executor_invariant(format!(
647                        "grouped canonical key must be Value::List, found {value:?}"
648                    )));
649                }
650            };
651            last_emitted_group_key = Some(emitted_group_key.clone());
652            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
653        }
654
655        let next_cursor = if has_more {
656            last_emitted_group_key.map(|last_group_key| {
657                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
658                    continuation_signature,
659                    last_group_key,
660                    Direction::Asc,
661                    resume_initial_offset,
662                ))
663            })
664        } else {
665            None
666        };
667        let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
668        let optimization = resolved.optimization;
669        let index_predicate_applied = resolved.index_predicate_applied;
670        let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
671        let distinct_keys_deduped = resolved
672            .distinct_keys_deduped_counter
673            .as_ref()
674            .map_or(0, |counter| counter.get());
675        let rows_returned = page_rows.len();
676
677        Self::finalize_path_outcome(
678            &mut execution_trace,
679            optimization,
680            rows_scanned,
681            rows_returned,
682            index_predicate_applied,
683            index_predicate_keys_rejected,
684            distinct_keys_deduped,
685        );
686        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
687        debug_assert!(
688            filtered_rows >= rows_returned,
689            "grouped pagination must return at most filtered row cardinality",
690        );
691
692        Ok((
693            GroupedCursorPage {
694                rows: page_rows,
695                next_cursor,
696            },
697            execution_trace,
698        ))
699    }
700
701    // Map grouped reducer errors into executor-owned error classes.
702    fn map_group_error(err: GroupError) -> InternalError {
703        match err {
704            GroupError::MemoryLimitExceeded { .. } => {
705                InternalError::executor_internal(err.to_string())
706            }
707            GroupError::Internal(inner) => inner,
708        }
709    }
710
711    // Convert one aggregate output payload into grouped response value payload.
712    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
713        match output {
714            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
715            AggregateOutput::Exists(value) => Value::Bool(*value),
716            AggregateOutput::Min(value)
717            | AggregateOutput::Max(value)
718            | AggregateOutput::First(value)
719            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
720        }
721    }
722
723    // Record shared observability outcome for any execution path.
724    fn finalize_path_outcome(
725        execution_trace: &mut Option<ExecutionTrace>,
726        optimization: Option<ExecutionOptimization>,
727        rows_scanned: usize,
728        rows_returned: usize,
729        index_predicate_applied: bool,
730        index_predicate_keys_rejected: u64,
731        distinct_keys_deduped: u64,
732    ) {
733        record_rows_scanned::<E>(rows_scanned);
734        if let Some(execution_trace) = execution_trace.as_mut() {
735            execution_trace.set_path_outcome(
736                optimization,
737                rows_scanned,
738                rows_returned,
739                index_predicate_applied,
740                index_predicate_keys_rejected,
741                distinct_keys_deduped,
742            );
743            debug_assert_eq!(
744                execution_trace.keys_scanned,
745                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
746                "execution trace keys_scanned must match rows_scanned metrics input",
747            );
748        }
749    }
750
751    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
752    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
753        plan: &AccessPlannedQuery<E::Key>,
754        cursor_boundary: Option<&CursorBoundary>,
755    ) -> Result<(), InternalError> {
756        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
757            return Ok(());
758        }
759        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
760
761        Ok(())
762    }
763}