Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod execute;
2mod fast_stream;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod secondary_index;
7mod terminal;
8mod trace;
9
10pub(in crate::db::executor) use self::execute::{
11    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
12};
13
14use self::trace::{access_path_variant, execution_order_direction};
15use crate::{
16    db::{
17        Context, Db, GroupedRow,
18        access::AccessPlan,
19        contracts::canonical_value_compare,
20        cursor::{
21            ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
22            PlannedCursor, decode_pk_cursor_boundary,
23        },
24        data::DataKey,
25        direction::Direction,
26        executor::{
27            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
28            KeyOrderComparator, OrderedKeyStreamBox,
29            aggregate::field::{
30                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
31                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
32            },
33            aggregate::{
34                AggregateOutput, FoldControl, GroupError,
35                ensure_grouped_spec_supported_for_execution,
36            },
37            group::{
38                CanonicalKey, grouped_budget_observability,
39                grouped_execution_context_from_planner_config,
40            },
41            plan_metrics::{record_plan_metrics, record_rows_scanned},
42            range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
43        },
44        index::IndexCompilePolicy,
45        query::{
46            plan::{AccessPlannedQuery, LogicalPlan, OrderDirection, grouped_executor_handoff},
47            policy,
48        },
49        response::Response,
50    },
51    error::InternalError,
52    obs::sink::{ExecKind, Span},
53    traits::{EntityKind, EntityValue},
54    value::Value,
55};
56use std::{cmp::Ordering, marker::PhantomData};
57
58///
59/// PageCursor
60///
61/// Internal continuation cursor enum for scalar and grouped pagination.
62///
63#[derive(Clone, Debug, Eq, PartialEq)]
64pub(in crate::db) enum PageCursor {
65    Scalar(ContinuationToken),
66    Grouped(GroupedContinuationToken),
67}
68
69impl PageCursor {
70    #[must_use]
71    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
72        match self {
73            Self::Scalar(token) => Some(token),
74            Self::Grouped(_) => None,
75        }
76    }
77
78    #[must_use]
79    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
80        match self {
81            Self::Scalar(_) => None,
82            Self::Grouped(token) => Some(token),
83        }
84    }
85}
86
87impl From<ContinuationToken> for PageCursor {
88    fn from(value: ContinuationToken) -> Self {
89        Self::Scalar(value)
90    }
91}
92
93impl From<GroupedContinuationToken> for PageCursor {
94    fn from(value: GroupedContinuationToken) -> Self {
95        Self::Grouped(value)
96    }
97}
98
99///
100/// CursorPage
101///
102/// Internal load page result with continuation cursor payload.
103/// Returned by paged executor entrypoints.
104///
105
106#[derive(Debug)]
107pub(crate) struct CursorPage<E: EntityKind> {
108    pub(crate) items: Response<E>,
109
110    pub(crate) next_cursor: Option<PageCursor>,
111}
112
113///
114/// GroupedCursorPage
115///
116/// Internal grouped page result with grouped rows and continuation cursor payload.
117///
118#[derive(Debug)]
119pub(in crate::db) struct GroupedCursorPage {
120    pub(in crate::db) rows: Vec<GroupedRow>,
121    pub(in crate::db) next_cursor: Option<PageCursor>,
122}
123
124///
125/// ExecutionAccessPathVariant
126///
127/// Coarse access path shape used by the load execution trace surface.
128///
129
130#[derive(Clone, Copy, Debug, Eq, PartialEq)]
131pub enum ExecutionAccessPathVariant {
132    ByKey,
133    ByKeys,
134    KeyRange,
135    IndexPrefix,
136    IndexRange,
137    FullScan,
138    Union,
139    Intersection,
140}
141
142///
143/// ExecutionOptimization
144///
145/// Canonical load optimization selected by execution, if any.
146///
147
148#[derive(Clone, Copy, Debug, Eq, PartialEq)]
149pub enum ExecutionOptimization {
150    PrimaryKey,
151    SecondaryOrderPushdown,
152    IndexRangeLimitPushdown,
153}
154
155///
156/// ExecutionTrace
157///
158/// Structured, opt-in load execution introspection snapshot.
159/// Captures plan-shape and execution decisions without changing semantics.
160///
161
162#[derive(Clone, Copy, Debug, Eq, PartialEq)]
163pub struct ExecutionTrace {
164    pub access_path_variant: ExecutionAccessPathVariant,
165    pub direction: OrderDirection,
166    pub optimization: Option<ExecutionOptimization>,
167    pub keys_scanned: u64,
168    pub rows_returned: u64,
169    pub continuation_applied: bool,
170    pub index_predicate_applied: bool,
171    pub index_predicate_keys_rejected: u64,
172    pub distinct_keys_deduped: u64,
173}
174
175impl ExecutionTrace {
176    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
177        Self {
178            access_path_variant: access_path_variant(access),
179            direction: execution_order_direction(direction),
180            optimization: None,
181            keys_scanned: 0,
182            rows_returned: 0,
183            continuation_applied,
184            index_predicate_applied: false,
185            index_predicate_keys_rejected: 0,
186            distinct_keys_deduped: 0,
187        }
188    }
189
190    fn set_path_outcome(
191        &mut self,
192        optimization: Option<ExecutionOptimization>,
193        keys_scanned: usize,
194        rows_returned: usize,
195        index_predicate_applied: bool,
196        index_predicate_keys_rejected: u64,
197        distinct_keys_deduped: u64,
198    ) {
199        self.optimization = optimization;
200        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
201        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
202        self.index_predicate_applied = index_predicate_applied;
203        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
204        self.distinct_keys_deduped = distinct_keys_deduped;
205    }
206}
207
208pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
209    direction: Direction,
210) -> KeyOrderComparator {
211    KeyOrderComparator::from_direction(direction)
212}
213
214///
215/// FastPathKeyResult
216///
217/// Internal fast-path access result.
218/// Carries ordered keys plus observability metadata for shared execution phases.
219///
220
221pub(in crate::db::executor) struct FastPathKeyResult {
222    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
223    pub(in crate::db::executor) rows_scanned: usize,
224    pub(in crate::db::executor) optimization: ExecutionOptimization,
225}
226
227///
228/// LoadExecutor
229///
230/// Load-plan executor with canonical post-access semantics.
231/// Coordinates fast paths, trace hooks, and pagination cursors.
232///
233
234#[derive(Clone)]
235pub(crate) struct LoadExecutor<E: EntityKind> {
236    db: Db<E::Canister>,
237    debug: bool,
238    _marker: PhantomData<E>,
239}
240
241impl<E> LoadExecutor<E>
242where
243    E: EntityKind + EntityValue,
244{
245    #[must_use]
246    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
247        Self {
248            db,
249            debug,
250            _marker: PhantomData,
251        }
252    }
253
254    // Recover canonical read context for kernel-owned execution setup.
255    pub(in crate::db::executor) fn recovered_context(
256        &self,
257    ) -> Result<crate::db::Context<'_, E>, InternalError> {
258        self.db.recovered_context::<E>()
259    }
260
261    // Resolve one orderable aggregate target field into a stable slot with
262    // canonical field-error taxonomy mapping.
263    pub(in crate::db::executor) fn resolve_orderable_field_slot(
264        target_field: &str,
265    ) -> Result<FieldSlot, InternalError> {
266        resolve_orderable_aggregate_target_slot::<E>(target_field)
267            .map_err(AggregateFieldValueError::into_internal_error)
268    }
269
270    // Resolve one aggregate target field into a stable slot with canonical
271    // field-error taxonomy mapping.
272    pub(in crate::db::executor) fn resolve_any_field_slot(
273        target_field: &str,
274    ) -> Result<FieldSlot, InternalError> {
275        resolve_any_aggregate_target_slot::<E>(target_field)
276            .map_err(AggregateFieldValueError::into_internal_error)
277    }
278
279    // Resolve one numeric aggregate target field into a stable slot with
280    // canonical field-error taxonomy mapping.
281    pub(in crate::db::executor) fn resolve_numeric_field_slot(
282        target_field: &str,
283    ) -> Result<FieldSlot, InternalError> {
284        resolve_numeric_aggregate_target_slot::<E>(target_field)
285            .map_err(AggregateFieldValueError::into_internal_error)
286    }
287
288    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
289        self.execute_paged_with_cursor(plan, PlannedCursor::none())
290            .map(|page| page.items)
291    }
292
293    pub(in crate::db) fn execute_paged_with_cursor(
294        &self,
295        plan: ExecutablePlan<E>,
296        cursor: impl Into<PlannedCursor>,
297    ) -> Result<CursorPage<E>, InternalError> {
298        self.execute_paged_with_cursor_traced(plan, cursor)
299            .map(|(page, _)| page)
300    }
301
302    pub(in crate::db) fn execute_paged_with_cursor_traced(
303        &self,
304        plan: ExecutablePlan<E>,
305        cursor: impl Into<PlannedCursor>,
306    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
307        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
308            return Err(InternalError::query_executor_invariant(
309                "grouped plans require execute_grouped pagination entrypoints",
310            ));
311        }
312
313        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
314        let cursor_boundary = cursor.boundary().cloned();
315        let index_range_token = cursor
316            .index_range_anchor()
317            .map(range_token_from_cursor_anchor);
318
319        if !plan.mode().is_load() {
320            return Err(InternalError::query_executor_invariant(
321                "load executor requires load plans",
322            ));
323        }
324        debug_assert!(
325            policy::validate_plan_shape(&plan.as_inner().logical).is_ok(),
326            "load executor received a plan shape that bypassed planning validation",
327        );
328
329        let continuation_signature = plan.continuation_signature();
330        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
331        let index_range_specs = plan.index_range_specs()?.to_vec();
332        let route_plan = Self::build_execution_route_plan_for_load(
333            plan.as_inner(),
334            cursor_boundary.as_ref(),
335            index_range_token.as_ref(),
336            None,
337        )?;
338        let continuation_applied = !matches!(
339            route_plan.continuation_mode(),
340            crate::db::executor::route::ContinuationMode::Initial
341        );
342        let direction = route_plan.direction();
343        debug_assert_eq!(
344            route_plan.window().effective_offset,
345            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
346            "route window effective offset must match logical plan offset semantics",
347        );
348        let mut execution_trace = self
349            .debug
350            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
351        let plan = plan.into_inner();
352        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
353
354        let result = (|| {
355            let mut span = Span::<E>::new(ExecKind::Load);
356
357            validate_executor_plan::<E>(&plan)?;
358            let ctx = self.db.recovered_context::<E>()?;
359            let execution_inputs = ExecutionInputs {
360                ctx: &ctx,
361                plan: &plan,
362                stream_bindings: AccessStreamBindings {
363                    index_prefix_specs: index_prefix_specs.as_slice(),
364                    index_range_specs: index_range_specs.as_slice(),
365                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
366                    direction,
367                },
368                execution_preparation: &execution_preparation,
369            };
370
371            record_plan_metrics(&plan.access);
372            // Plan execution routing once, then execute in canonical order.
373            // Resolve one canonical key stream, then run shared page materialization/finalization.
374            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
375                &execution_inputs,
376                &route_plan,
377                cursor_boundary.as_ref(),
378                continuation_signature,
379                IndexCompilePolicy::ConservativeSubset,
380            )?;
381            let page = materialized.page;
382            let rows_scanned = materialized.rows_scanned;
383            let post_access_rows = materialized.post_access_rows;
384            let optimization = materialized.optimization;
385            let index_predicate_applied = materialized.index_predicate_applied;
386            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
387            let distinct_keys_deduped = materialized.distinct_keys_deduped;
388
389            Ok(Self::finalize_execution(
390                page,
391                optimization,
392                rows_scanned,
393                post_access_rows,
394                index_predicate_applied,
395                index_predicate_keys_rejected,
396                distinct_keys_deduped,
397                &mut span,
398                &mut execution_trace,
399            ))
400        })();
401
402        result.map(|page| (page, execution_trace))
403    }
404
405    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
406        &self,
407        plan: ExecutablePlan<E>,
408        cursor: impl Into<GroupedPlannedCursor>,
409    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
410        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
411            return Err(InternalError::query_executor_invariant(
412                "grouped execution requires grouped logical plans",
413            ));
414        }
415
416        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
417
418        self.execute_grouped_path(plan, cursor)
419    }
420
421    // Execute grouped blocking reduction and produce grouped page rows + grouped cursor.
422    #[expect(clippy::too_many_lines)]
423    fn execute_grouped_path(
424        &self,
425        plan: ExecutablePlan<E>,
426        cursor: GroupedPlannedCursor,
427    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
428        validate_executor_plan::<E>(plan.as_inner())?;
429        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
430        let grouped_execution = grouped_handoff.execution();
431        let group_fields = grouped_handoff.group_fields().to_vec();
432        let grouped_route_plan =
433            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
434        let grouped_route_observability =
435            grouped_route_plan.grouped_observability().ok_or_else(|| {
436                InternalError::query_executor_invariant(
437                    "grouped route planning must emit grouped observability payload",
438                )
439            })?;
440        let direction = grouped_route_plan.direction();
441        let continuation_applied = !cursor.is_empty();
442        let mut execution_trace = self
443            .debug
444            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
445        let continuation_signature = plan.continuation_signature();
446        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
447        let index_range_specs = plan.index_range_specs()?.to_vec();
448
449        ensure_grouped_spec_supported_for_execution(
450            grouped_handoff.group_fields(),
451            grouped_handoff.aggregates(),
452        )
453        .map_err(|err| InternalError::executor_unsupported(err.to_string()))?;
454
455        let mut grouped_execution_context =
456            grouped_execution_context_from_planner_config(Some(grouped_execution));
457        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
458        debug_assert!(
459            grouped_budget.max_groups() >= grouped_budget.groups()
460                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
461                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
462            "grouped budget observability invariants must hold at grouped route entry"
463        );
464
465        // Observe grouped route outcome/rejection once at grouped runtime entry.
466        let grouped_route_outcome = grouped_route_observability.outcome();
467        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
468        let grouped_route_eligible = grouped_route_observability.eligible();
469        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
470        debug_assert!(
471            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
472            "grouped route eligibility and rejection reason must stay aligned",
473        );
474        debug_assert!(
475            grouped_route_outcome
476                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
477                || grouped_route_rejection_reason.is_some(),
478            "grouped rejected outcomes must carry a rejection reason",
479        );
480        debug_assert!(
481            matches!(
482                grouped_route_execution_mode,
483                crate::db::executor::route::ExecutionMode::Materialized
484            ),
485            "grouped execution route must remain blocking/materialized",
486        );
487        let mut grouped_engines = grouped_handoff
488            .aggregates()
489            .iter()
490            .map(|aggregate| {
491                if aggregate.target_field().is_some() {
492                    return Err(InternalError::query_executor_invariant(format!(
493                        "grouped field-target aggregate reached executor after planning: {:?}",
494                        aggregate.kind()
495                    )));
496                }
497
498                Ok(grouped_execution_context.create_grouped_engine::<E>(
499                    aggregate.kind(),
500                    aggregate.kind().materialized_fold_direction(),
501                ))
502            })
503            .collect::<Result<Vec<_>, _>>()?;
504        let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
505        let plan = plan.into_inner();
506        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
507
508        let mut span = Span::<E>::new(ExecKind::Load);
509        let ctx = self.db.recovered_context::<E>()?;
510        let execution_inputs = ExecutionInputs {
511            ctx: &ctx,
512            plan: &plan,
513            stream_bindings: AccessStreamBindings {
514                index_prefix_specs: index_prefix_specs.as_slice(),
515                index_range_specs: index_range_specs.as_slice(),
516                index_range_anchor: None,
517                direction,
518            },
519            execution_preparation: &execution_preparation,
520        };
521        record_plan_metrics(&plan.access);
522        let mut resolved = Self::resolve_execution_key_stream_without_distinct(
523            &execution_inputs,
524            &grouped_route_plan,
525            IndexCompilePolicy::ConservativeSubset,
526        )?;
527        let data_rows = ctx.rows_from_ordered_key_stream(
528            resolved.key_stream.as_mut(),
529            plan.scalar_plan().consistency,
530        )?;
531        let scanned_rows = data_rows.len();
532        let mut rows = Context::<E>::deserialize_rows(data_rows)?;
533        if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
534            rows.retain(|row| compiled_predicate.eval(&row.1));
535        }
536        let filtered_rows = rows.len();
537
538        // Phase 1: fold every filtered row into per-group aggregate states.
539        for (id, entity) in &rows {
540            let group_values = group_fields
541                .iter()
542                .map(|field| {
543                    entity.get_value_by_index(field.index()).ok_or_else(|| {
544                        InternalError::query_executor_invariant(format!(
545                            "grouped field slot missing on entity: index={}",
546                            field.index()
547                        ))
548                    })
549                })
550                .collect::<Result<Vec<_>, _>>()?;
551            let group_key = Value::List(group_values)
552                .canonical_key()
553                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
554            let canonical_group_value = group_key.canonical_value().clone();
555            let data_key = DataKey::try_new::<E>(id.key())?;
556
557            for (index, engine) in grouped_engines.iter_mut().enumerate() {
558                if short_circuit_keys[index].iter().any(|done| {
559                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
560                }) {
561                    continue;
562                }
563
564                let fold_control = engine
565                    .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
566                    .map_err(Self::map_group_error)?;
567                if matches!(fold_control, FoldControl::Break) {
568                    short_circuit_keys[index].push(canonical_group_value.clone());
569                }
570            }
571        }
572
573        // Phase 2: finalize grouped aggregate states and align outputs by declared aggregate order.
574        let aggregate_count = grouped_engines.len();
575        let mut grouped_rows_by_key = Vec::<(Value, Vec<Value>)>::new();
576        for (index, engine) in grouped_engines.into_iter().enumerate() {
577            let finalized = engine.finalize_grouped()?;
578            for output in finalized {
579                let group_key = output.group_key().canonical_value().clone();
580                let aggregate_value = Self::aggregate_output_to_value(output.output());
581                if let Some((_, existing_aggregates)) =
582                    grouped_rows_by_key
583                        .iter_mut()
584                        .find(|(existing_group_key, _)| {
585                            canonical_value_compare(existing_group_key, &group_key)
586                                == Ordering::Equal
587                        })
588                {
589                    if let Some(slot) = existing_aggregates.get_mut(index) {
590                        *slot = aggregate_value;
591                    }
592                } else {
593                    let mut aggregates = vec![Value::Null; aggregate_count];
594                    if let Some(slot) = aggregates.get_mut(index) {
595                        *slot = aggregate_value;
596                    }
597                    grouped_rows_by_key.push((group_key, aggregates));
598                }
599            }
600        }
601        grouped_rows_by_key.sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
602
603        // Phase 3: apply grouped resume/offset/limit and build grouped continuation token.
604        let initial_offset = plan
605            .scalar_plan()
606            .page
607            .as_ref()
608            .map_or(0, |page| page.offset);
609        let resume_initial_offset = if cursor.is_empty() {
610            initial_offset
611        } else {
612            cursor.initial_offset()
613        };
614        let resume_boundary = cursor
615            .last_group_key()
616            .map(|last_group_key| Value::List(last_group_key.to_vec()));
617        let apply_initial_offset = cursor.is_empty();
618        let mut groups_skipped_for_offset = 0u32;
619        let limit = plan
620            .scalar_plan()
621            .page
622            .as_ref()
623            .and_then(|page| page.limit)
624            .and_then(|limit| usize::try_from(limit).ok());
625        let mut page_rows = Vec::<GroupedRow>::new();
626        let mut last_emitted_group_key: Option<Vec<Value>> = None;
627        let mut has_more = false;
628        for (group_key_value, aggregate_values) in grouped_rows_by_key {
629            if let Some(resume_boundary) = resume_boundary.as_ref()
630                && canonical_value_compare(&group_key_value, resume_boundary) != Ordering::Greater
631            {
632                continue;
633            }
634            if apply_initial_offset && groups_skipped_for_offset < initial_offset {
635                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
636                continue;
637            }
638            if limit.is_some_and(|limit| limit == 0) {
639                break;
640            }
641            if let Some(limit) = limit
642                && page_rows.len() >= limit
643            {
644                has_more = true;
645                break;
646            }
647
648            let emitted_group_key = match group_key_value {
649                Value::List(values) => values,
650                value => {
651                    return Err(InternalError::query_executor_invariant(format!(
652                        "grouped canonical key must be Value::List, found {value:?}"
653                    )));
654                }
655            };
656            last_emitted_group_key = Some(emitted_group_key.clone());
657            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
658        }
659
660        let next_cursor = if has_more {
661            last_emitted_group_key.map(|last_group_key| {
662                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
663                    continuation_signature,
664                    last_group_key,
665                    Direction::Asc,
666                    resume_initial_offset,
667                ))
668            })
669        } else {
670            None
671        };
672        let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
673        let optimization = resolved.optimization;
674        let index_predicate_applied = resolved.index_predicate_applied;
675        let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
676        let distinct_keys_deduped = resolved
677            .distinct_keys_deduped_counter
678            .as_ref()
679            .map_or(0, |counter| counter.get());
680        let rows_returned = page_rows.len();
681
682        Self::finalize_path_outcome(
683            &mut execution_trace,
684            optimization,
685            rows_scanned,
686            rows_returned,
687            index_predicate_applied,
688            index_predicate_keys_rejected,
689            distinct_keys_deduped,
690        );
691        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
692        debug_assert!(
693            filtered_rows >= rows_returned,
694            "grouped pagination must return at most filtered row cardinality",
695        );
696
697        Ok((
698            GroupedCursorPage {
699                rows: page_rows,
700                next_cursor,
701            },
702            execution_trace,
703        ))
704    }
705
706    // Map grouped reducer errors into executor-owned error classes.
707    fn map_group_error(err: GroupError) -> InternalError {
708        match err {
709            GroupError::MemoryLimitExceeded { .. } => {
710                InternalError::executor_internal(err.to_string())
711            }
712            GroupError::Internal(inner) => inner,
713        }
714    }
715
716    // Convert one aggregate output payload into grouped response value payload.
717    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
718        match output {
719            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
720            AggregateOutput::Exists(value) => Value::Bool(*value),
721            AggregateOutput::Min(value)
722            | AggregateOutput::Max(value)
723            | AggregateOutput::First(value)
724            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
725        }
726    }
727
728    // Record shared observability outcome for any execution path.
729    fn finalize_path_outcome(
730        execution_trace: &mut Option<ExecutionTrace>,
731        optimization: Option<ExecutionOptimization>,
732        rows_scanned: usize,
733        rows_returned: usize,
734        index_predicate_applied: bool,
735        index_predicate_keys_rejected: u64,
736        distinct_keys_deduped: u64,
737    ) {
738        record_rows_scanned::<E>(rows_scanned);
739        if let Some(execution_trace) = execution_trace.as_mut() {
740            execution_trace.set_path_outcome(
741                optimization,
742                rows_scanned,
743                rows_returned,
744                index_predicate_applied,
745                index_predicate_keys_rejected,
746                distinct_keys_deduped,
747            );
748            debug_assert_eq!(
749                execution_trace.keys_scanned,
750                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
751                "execution trace keys_scanned must match rows_scanned metrics input",
752            );
753        }
754    }
755
756    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
757    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
758        plan: &AccessPlannedQuery<E::Key>,
759        cursor_boundary: Option<&CursorBoundary>,
760    ) -> Result<(), InternalError> {
761        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
762            return Ok(());
763        }
764        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
765
766        Ok(())
767    }
768}