Skip to main content

icydb_core/db/executor/load/
mod.rs

1mod execute;
2mod fast_stream;
3mod index_range_limit;
4mod page;
5mod pk_stream;
6mod secondary_index;
7mod terminal;
8mod trace;
9
10pub(in crate::db::executor) use self::execute::{
11    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
12};
13
14use self::trace::{access_path_variant, execution_order_direction};
15use crate::{
16    db::{
17        Context, Db, GroupedRow,
18        access::AccessPlan,
19        contracts::canonical_value_compare,
20        cursor::{
21            ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
22            PlannedCursor, decode_pk_cursor_boundary,
23        },
24        data::DataKey,
25        direction::Direction,
26        executor::{
27            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
28            KeyOrderComparator, OrderedKeyStreamBox,
29            aggregate::field::{
30                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
31                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
32            },
33            aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
34            group::{
35                CanonicalKey, grouped_budget_observability,
36                grouped_execution_context_from_planner_config,
37            },
38            plan_metrics::{record_plan_metrics, record_rows_scanned},
39            range_token_anchor_key, range_token_from_cursor_anchor, validate_executor_plan,
40        },
41        index::IndexCompilePolicy,
42        policy,
43        query::plan::{AccessPlannedQuery, LogicalPlan, OrderDirection, grouped_executor_handoff},
44        response::Response,
45    },
46    error::InternalError,
47    obs::sink::{ExecKind, Span},
48    traits::{EntityKind, EntityValue},
49    value::Value,
50};
51use std::{cmp::Ordering, marker::PhantomData, ops::Deref};
52
53///
54/// PageCursor
55///
56/// Internal continuation cursor enum for scalar and grouped pagination.
57///
58#[derive(Clone, Debug, Eq, PartialEq)]
59pub(in crate::db) enum PageCursor {
60    Scalar(ContinuationToken),
61    Grouped(GroupedContinuationToken),
62}
63
64impl PageCursor {
65    #[must_use]
66    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
67        match self {
68            Self::Scalar(token) => Some(token),
69            Self::Grouped(_) => None,
70        }
71    }
72
73    #[must_use]
74    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
75        match self {
76            Self::Scalar(_) => None,
77            Self::Grouped(token) => Some(token),
78        }
79    }
80
81    // Preserve scalar cursor call-site compatibility for existing scalar tests.
82    fn scalar_or_panic(&self) -> &ContinuationToken {
83        match self {
84            Self::Scalar(token) => token,
85            Self::Grouped(_) => {
86                panic!("grouped continuation cursor cannot be accessed as scalar token")
87            }
88        }
89    }
90}
91
92impl Deref for PageCursor {
93    type Target = ContinuationToken;
94
95    fn deref(&self) -> &Self::Target {
96        self.scalar_or_panic()
97    }
98}
99
100impl From<ContinuationToken> for PageCursor {
101    fn from(value: ContinuationToken) -> Self {
102        Self::Scalar(value)
103    }
104}
105
106impl From<GroupedContinuationToken> for PageCursor {
107    fn from(value: GroupedContinuationToken) -> Self {
108        Self::Grouped(value)
109    }
110}
111
112///
113/// CursorPage
114///
115/// Internal load page result with continuation cursor payload.
116/// Returned by paged executor entrypoints.
117///
118
119#[derive(Debug)]
120pub(crate) struct CursorPage<E: EntityKind> {
121    pub(crate) items: Response<E>,
122
123    pub(crate) next_cursor: Option<PageCursor>,
124}
125
126///
127/// GroupedCursorPage
128///
129/// Internal grouped page result with grouped rows and continuation cursor payload.
130///
131#[derive(Debug)]
132pub(in crate::db) struct GroupedCursorPage {
133    pub(in crate::db) rows: Vec<GroupedRow>,
134    pub(in crate::db) next_cursor: Option<PageCursor>,
135}
136
137///
138/// ExecutionAccessPathVariant
139///
140/// Coarse access path shape used by the load execution trace surface.
141///
142
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub enum ExecutionAccessPathVariant {
145    ByKey,
146    ByKeys,
147    KeyRange,
148    IndexPrefix,
149    IndexRange,
150    FullScan,
151    Union,
152    Intersection,
153}
154
155///
156/// ExecutionOptimization
157///
158/// Canonical load optimization selected by execution, if any.
159///
160
161#[derive(Clone, Copy, Debug, Eq, PartialEq)]
162pub enum ExecutionOptimization {
163    PrimaryKey,
164    SecondaryOrderPushdown,
165    IndexRangeLimitPushdown,
166}
167
168///
169/// ExecutionTrace
170///
171/// Structured, opt-in load execution introspection snapshot.
172/// Captures plan-shape and execution decisions without changing semantics.
173///
174
175#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub struct ExecutionTrace {
177    pub access_path_variant: ExecutionAccessPathVariant,
178    pub direction: OrderDirection,
179    pub optimization: Option<ExecutionOptimization>,
180    pub keys_scanned: u64,
181    pub rows_returned: u64,
182    pub continuation_applied: bool,
183    pub index_predicate_applied: bool,
184    pub index_predicate_keys_rejected: u64,
185    pub distinct_keys_deduped: u64,
186}
187
188impl ExecutionTrace {
189    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
190        Self {
191            access_path_variant: access_path_variant(access),
192            direction: execution_order_direction(direction),
193            optimization: None,
194            keys_scanned: 0,
195            rows_returned: 0,
196            continuation_applied,
197            index_predicate_applied: false,
198            index_predicate_keys_rejected: 0,
199            distinct_keys_deduped: 0,
200        }
201    }
202
203    fn set_path_outcome(
204        &mut self,
205        optimization: Option<ExecutionOptimization>,
206        keys_scanned: usize,
207        rows_returned: usize,
208        index_predicate_applied: bool,
209        index_predicate_keys_rejected: u64,
210        distinct_keys_deduped: u64,
211    ) {
212        self.optimization = optimization;
213        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
214        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
215        self.index_predicate_applied = index_predicate_applied;
216        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
217        self.distinct_keys_deduped = distinct_keys_deduped;
218    }
219}
220
221pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
222    direction: Direction,
223) -> KeyOrderComparator {
224    KeyOrderComparator::from_direction(direction)
225}
226
227///
228/// FastPathKeyResult
229///
230/// Internal fast-path access result.
231/// Carries ordered keys plus observability metadata for shared execution phases.
232///
233
234pub(in crate::db::executor) struct FastPathKeyResult {
235    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
236    pub(in crate::db::executor) rows_scanned: usize,
237    pub(in crate::db::executor) optimization: ExecutionOptimization,
238}
239
240///
241/// LoadExecutor
242///
243/// Load-plan executor with canonical post-access semantics.
244/// Coordinates fast paths, trace hooks, and pagination cursors.
245///
246
247#[derive(Clone)]
248pub(crate) struct LoadExecutor<E: EntityKind> {
249    db: Db<E::Canister>,
250    debug: bool,
251    _marker: PhantomData<E>,
252}
253
254impl<E> LoadExecutor<E>
255where
256    E: EntityKind + EntityValue,
257{
258    #[must_use]
259    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
260        Self {
261            db,
262            debug,
263            _marker: PhantomData,
264        }
265    }
266
267    // Recover canonical read context for kernel-owned execution setup.
268    pub(in crate::db::executor) fn recovered_context(
269        &self,
270    ) -> Result<crate::db::Context<'_, E>, InternalError> {
271        self.db.recovered_context::<E>()
272    }
273
274    // Resolve one orderable aggregate target field into a stable slot with
275    // canonical field-error taxonomy mapping.
276    pub(in crate::db::executor) fn resolve_orderable_field_slot(
277        target_field: &str,
278    ) -> Result<FieldSlot, InternalError> {
279        resolve_orderable_aggregate_target_slot::<E>(target_field)
280            .map_err(AggregateFieldValueError::into_internal_error)
281    }
282
283    // Resolve one aggregate target field into a stable slot with canonical
284    // field-error taxonomy mapping.
285    pub(in crate::db::executor) fn resolve_any_field_slot(
286        target_field: &str,
287    ) -> Result<FieldSlot, InternalError> {
288        resolve_any_aggregate_target_slot::<E>(target_field)
289            .map_err(AggregateFieldValueError::into_internal_error)
290    }
291
292    // Resolve one numeric aggregate target field into a stable slot with
293    // canonical field-error taxonomy mapping.
294    pub(in crate::db::executor) fn resolve_numeric_field_slot(
295        target_field: &str,
296    ) -> Result<FieldSlot, InternalError> {
297        resolve_numeric_aggregate_target_slot::<E>(target_field)
298            .map_err(AggregateFieldValueError::into_internal_error)
299    }
300
301    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
302        self.execute_paged_with_cursor(plan, PlannedCursor::none())
303            .map(|page| page.items)
304    }
305
306    pub(in crate::db) fn execute_paged_with_cursor(
307        &self,
308        plan: ExecutablePlan<E>,
309        cursor: impl Into<PlannedCursor>,
310    ) -> Result<CursorPage<E>, InternalError> {
311        self.execute_paged_with_cursor_traced(plan, cursor)
312            .map(|(page, _)| page)
313    }
314
315    pub(in crate::db) fn execute_paged_with_cursor_traced(
316        &self,
317        plan: ExecutablePlan<E>,
318        cursor: impl Into<PlannedCursor>,
319    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
320        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
321            return Err(InternalError::query_executor_invariant(
322                "grouped plans require execute_grouped pagination entrypoints",
323            ));
324        }
325
326        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
327        let cursor_boundary = cursor.boundary().cloned();
328        let index_range_token = cursor
329            .index_range_anchor()
330            .map(range_token_from_cursor_anchor);
331
332        if !plan.mode().is_load() {
333            return Err(InternalError::query_executor_invariant(
334                "load executor requires load plans",
335            ));
336        }
337        debug_assert!(
338            policy::validate_plan_shape(&plan.as_inner().logical).is_ok(),
339            "load executor received a plan shape that bypassed planning validation",
340        );
341
342        let continuation_signature = plan.continuation_signature();
343        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
344        let index_range_specs = plan.index_range_specs()?.to_vec();
345        let route_plan = Self::build_execution_route_plan_for_load(
346            plan.as_inner(),
347            cursor_boundary.as_ref(),
348            index_range_token.as_ref(),
349            None,
350        )?;
351        let continuation_applied = !matches!(
352            route_plan.continuation_mode(),
353            crate::db::executor::route::ContinuationMode::Initial
354        );
355        let direction = route_plan.direction();
356        debug_assert_eq!(
357            route_plan.window().effective_offset,
358            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
359            "route window effective offset must match logical plan offset semantics",
360        );
361        let mut execution_trace = self
362            .debug
363            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
364        let plan = plan.into_inner();
365        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
366
367        let result = (|| {
368            let mut span = Span::<E>::new(ExecKind::Load);
369
370            validate_executor_plan::<E>(&plan)?;
371            let ctx = self.db.recovered_context::<E>()?;
372            let execution_inputs = ExecutionInputs {
373                ctx: &ctx,
374                plan: &plan,
375                stream_bindings: AccessStreamBindings {
376                    index_prefix_specs: index_prefix_specs.as_slice(),
377                    index_range_specs: index_range_specs.as_slice(),
378                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
379                    direction,
380                },
381                execution_preparation: &execution_preparation,
382            };
383
384            record_plan_metrics(&plan.access);
385            // Plan execution routing once, then execute in canonical order.
386            // Resolve one canonical key stream, then run shared page materialization/finalization.
387            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
388                &execution_inputs,
389                &route_plan,
390                cursor_boundary.as_ref(),
391                continuation_signature,
392                IndexCompilePolicy::ConservativeSubset,
393            )?;
394            let page = materialized.page;
395            let rows_scanned = materialized.rows_scanned;
396            let post_access_rows = materialized.post_access_rows;
397            let optimization = materialized.optimization;
398            let index_predicate_applied = materialized.index_predicate_applied;
399            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
400            let distinct_keys_deduped = materialized.distinct_keys_deduped;
401
402            Ok(Self::finalize_execution(
403                page,
404                optimization,
405                rows_scanned,
406                post_access_rows,
407                index_predicate_applied,
408                index_predicate_keys_rejected,
409                distinct_keys_deduped,
410                &mut span,
411                &mut execution_trace,
412            ))
413        })();
414
415        result.map(|page| (page, execution_trace))
416    }
417
418    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
419        &self,
420        plan: ExecutablePlan<E>,
421        cursor: impl Into<GroupedPlannedCursor>,
422    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
423        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
424            return Err(InternalError::query_executor_invariant(
425                "grouped execution requires grouped logical plans",
426            ));
427        }
428
429        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
430
431        self.execute_grouped_path(plan, cursor)
432    }
433
434    // Execute grouped blocking reduction and produce grouped page rows + grouped cursor.
435    #[expect(clippy::too_many_lines)]
436    fn execute_grouped_path(
437        &self,
438        plan: ExecutablePlan<E>,
439        cursor: GroupedPlannedCursor,
440    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
441        validate_executor_plan::<E>(plan.as_inner())?;
442        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
443        let grouped_execution = grouped_handoff.execution();
444        let group_fields = grouped_handoff.group_fields().to_vec();
445        let grouped_spec = Self::lower_grouped_spec_for_executor_contract(&grouped_handoff);
446        let grouped_route_plan =
447            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
448        let grouped_route_observability =
449            grouped_route_plan.grouped_observability().ok_or_else(|| {
450                InternalError::query_executor_invariant(
451                    "grouped route planning must emit grouped observability payload",
452                )
453            })?;
454        let direction = grouped_route_plan.direction();
455        let continuation_applied = !cursor.is_empty();
456        let mut execution_trace = self
457            .debug
458            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
459        let continuation_signature = plan.continuation_signature();
460        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
461        let index_range_specs = plan.index_range_specs()?.to_vec();
462
463        grouped_spec
464            .ensure_supported_for_execution()
465            .map_err(|err| InternalError::executor_unsupported(err.to_string()))?;
466
467        let mut grouped_execution_context =
468            grouped_execution_context_from_planner_config(Some(grouped_execution));
469        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
470        debug_assert!(
471            grouped_budget.max_groups() >= grouped_budget.groups()
472                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
473                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
474            "grouped budget observability invariants must hold at grouped route entry"
475        );
476
477        // Observe grouped route outcome/rejection once at grouped runtime entry.
478        let grouped_route_outcome = grouped_route_observability.outcome();
479        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
480        let grouped_route_eligible = grouped_route_observability.eligible();
481        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
482        debug_assert!(
483            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
484            "grouped route eligibility and rejection reason must stay aligned",
485        );
486        debug_assert!(
487            grouped_route_outcome
488                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
489                || grouped_route_rejection_reason.is_some(),
490            "grouped rejected outcomes must carry a rejection reason",
491        );
492        debug_assert!(
493            matches!(
494                grouped_route_execution_mode,
495                crate::db::executor::route::ExecutionMode::Materialized
496            ),
497            "grouped execution route must remain blocking/materialized",
498        );
499        let plan = plan.into_inner();
500        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
501        let mut grouped_states = grouped_spec
502            .aggregate_specs()
503            .iter()
504            .map(|spec| {
505                if spec.target_field().is_some() {
506                    return Err(InternalError::executor_unsupported(format!(
507                        "grouped field-target aggregates are not yet enabled: {:?}",
508                        spec.kind()
509                    )));
510                }
511
512                Ok(grouped_execution_context.create_grouped_state::<E>(
513                    spec.kind(),
514                    Self::grouped_reduction_direction(spec.kind()),
515                ))
516            })
517            .collect::<Result<Vec<_>, _>>()?;
518        let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_states.len()];
519
520        let mut span = Span::<E>::new(ExecKind::Load);
521        let ctx = self.db.recovered_context::<E>()?;
522        let execution_inputs = ExecutionInputs {
523            ctx: &ctx,
524            plan: &plan,
525            stream_bindings: AccessStreamBindings {
526                index_prefix_specs: index_prefix_specs.as_slice(),
527                index_range_specs: index_range_specs.as_slice(),
528                index_range_anchor: None,
529                direction,
530            },
531            execution_preparation: &execution_preparation,
532        };
533        record_plan_metrics(&plan.access);
534        let mut resolved = Self::resolve_execution_key_stream_without_distinct(
535            &execution_inputs,
536            &grouped_route_plan,
537            IndexCompilePolicy::ConservativeSubset,
538        )?;
539        let data_rows = ctx.rows_from_ordered_key_stream(
540            resolved.key_stream.as_mut(),
541            plan.scalar_plan().consistency,
542        )?;
543        let scanned_rows = data_rows.len();
544        let mut rows = Context::<E>::deserialize_rows(data_rows)?;
545        if let Some(compiled_predicate) = execution_preparation.compiled_predicate() {
546            rows.retain(|row| compiled_predicate.eval(&row.1));
547        }
548        let filtered_rows = rows.len();
549
550        // Phase 1: fold every filtered row into per-group aggregate states.
551        for (id, entity) in &rows {
552            let group_values = group_fields
553                .iter()
554                .map(|field| {
555                    entity.get_value_by_index(field.index()).ok_or_else(|| {
556                        InternalError::query_executor_invariant(format!(
557                            "grouped field slot missing on entity: index={}",
558                            field.index()
559                        ))
560                    })
561                })
562                .collect::<Result<Vec<_>, _>>()?;
563            let group_key = Value::List(group_values)
564                .canonical_key()
565                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
566            let canonical_group_value = group_key.canonical_value().clone();
567            let data_key = DataKey::try_new::<E>(id.key())?;
568
569            for (index, state) in grouped_states.iter_mut().enumerate() {
570                if short_circuit_keys[index].iter().any(|done| {
571                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
572                }) {
573                    continue;
574                }
575
576                let fold_control = state
577                    .apply(group_key.clone(), &data_key, &mut grouped_execution_context)
578                    .map_err(Self::map_group_error)?;
579                if matches!(fold_control, FoldControl::Break) {
580                    short_circuit_keys[index].push(canonical_group_value.clone());
581                }
582            }
583        }
584
585        // Phase 2: finalize grouped aggregate states and align outputs by declared aggregate order.
586        let aggregate_count = grouped_states.len();
587        let mut grouped_rows_by_key = Vec::<(Value, Vec<Value>)>::new();
588        for (index, state) in grouped_states.into_iter().enumerate() {
589            let finalized = state.finalize();
590            for output in finalized {
591                let group_key = output.group_key().canonical_value().clone();
592                let aggregate_value = Self::aggregate_output_to_value(output.output());
593                if let Some((_, existing_aggregates)) =
594                    grouped_rows_by_key
595                        .iter_mut()
596                        .find(|(existing_group_key, _)| {
597                            canonical_value_compare(existing_group_key, &group_key)
598                                == Ordering::Equal
599                        })
600                {
601                    if let Some(slot) = existing_aggregates.get_mut(index) {
602                        *slot = aggregate_value;
603                    }
604                } else {
605                    let mut aggregates = vec![Value::Null; aggregate_count];
606                    if let Some(slot) = aggregates.get_mut(index) {
607                        *slot = aggregate_value;
608                    }
609                    grouped_rows_by_key.push((group_key, aggregates));
610                }
611            }
612        }
613        grouped_rows_by_key.sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
614
615        // Phase 3: apply grouped resume/offset/limit and build grouped continuation token.
616        let initial_offset = plan
617            .scalar_plan()
618            .page
619            .as_ref()
620            .map_or(0, |page| page.offset);
621        let resume_initial_offset = if cursor.is_empty() {
622            initial_offset
623        } else {
624            cursor.initial_offset()
625        };
626        let resume_boundary = cursor
627            .last_group_key()
628            .map(|last_group_key| Value::List(last_group_key.to_vec()));
629        let apply_initial_offset = cursor.is_empty();
630        let mut groups_skipped_for_offset = 0u32;
631        let limit = plan
632            .scalar_plan()
633            .page
634            .as_ref()
635            .and_then(|page| page.limit)
636            .and_then(|limit| usize::try_from(limit).ok());
637        let mut page_rows = Vec::<GroupedRow>::new();
638        let mut last_emitted_group_key: Option<Vec<Value>> = None;
639        let mut has_more = false;
640        for (group_key_value, aggregate_values) in grouped_rows_by_key {
641            if let Some(resume_boundary) = resume_boundary.as_ref()
642                && canonical_value_compare(&group_key_value, resume_boundary) != Ordering::Greater
643            {
644                continue;
645            }
646            if apply_initial_offset && groups_skipped_for_offset < initial_offset {
647                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
648                continue;
649            }
650            if limit.is_some_and(|limit| limit == 0) {
651                break;
652            }
653            if let Some(limit) = limit
654                && page_rows.len() >= limit
655            {
656                has_more = true;
657                break;
658            }
659
660            let emitted_group_key = match group_key_value {
661                Value::List(values) => values,
662                value => {
663                    return Err(InternalError::query_executor_invariant(format!(
664                        "grouped canonical key must be Value::List, found {value:?}"
665                    )));
666                }
667            };
668            last_emitted_group_key = Some(emitted_group_key.clone());
669            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
670        }
671
672        let next_cursor = if has_more {
673            last_emitted_group_key.map(|last_group_key| {
674                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
675                    continuation_signature,
676                    last_group_key,
677                    Direction::Asc,
678                    resume_initial_offset,
679                ))
680            })
681        } else {
682            None
683        };
684        let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
685        let optimization = resolved.optimization;
686        let index_predicate_applied = resolved.index_predicate_applied;
687        let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
688        let distinct_keys_deduped = resolved
689            .distinct_keys_deduped_counter
690            .as_ref()
691            .map_or(0, |counter| counter.get());
692        let rows_returned = page_rows.len();
693
694        Self::finalize_path_outcome(
695            &mut execution_trace,
696            optimization,
697            rows_scanned,
698            rows_returned,
699            index_predicate_applied,
700            index_predicate_keys_rejected,
701            distinct_keys_deduped,
702        );
703        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
704        debug_assert!(
705            filtered_rows >= rows_returned,
706            "grouped pagination must return at most filtered row cardinality",
707        );
708
709        Ok((
710            GroupedCursorPage {
711                rows: page_rows,
712                next_cursor,
713            },
714            execution_trace,
715        ))
716    }
717
718    // Map grouped reducer errors into executor-owned error classes.
719    fn map_group_error(err: GroupError) -> InternalError {
720        match err {
721            GroupError::MemoryLimitExceeded { .. } => {
722                InternalError::executor_internal(err.to_string())
723            }
724            GroupError::Internal(inner) => inner,
725        }
726    }
727
728    // Use non-short-circuit directions for MIN/MAX grouped materialized folds.
729    const fn grouped_reduction_direction(kind: AggregateKind) -> Direction {
730        match kind {
731            AggregateKind::Min => Direction::Desc,
732            AggregateKind::Max
733            | AggregateKind::Count
734            | AggregateKind::Exists
735            | AggregateKind::First
736            | AggregateKind::Last => Direction::Asc,
737        }
738    }
739
740    // Convert one aggregate output payload into grouped response value payload.
741    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
742        match output {
743            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
744            AggregateOutput::Exists(value) => Value::Bool(*value),
745            AggregateOutput::Min(value)
746            | AggregateOutput::Max(value)
747            | AggregateOutput::First(value)
748            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
749        }
750    }
751
752    // Record shared observability outcome for any execution path.
753    fn finalize_path_outcome(
754        execution_trace: &mut Option<ExecutionTrace>,
755        optimization: Option<ExecutionOptimization>,
756        rows_scanned: usize,
757        rows_returned: usize,
758        index_predicate_applied: bool,
759        index_predicate_keys_rejected: u64,
760        distinct_keys_deduped: u64,
761    ) {
762        record_rows_scanned::<E>(rows_scanned);
763        if let Some(execution_trace) = execution_trace.as_mut() {
764            execution_trace.set_path_outcome(
765                optimization,
766                rows_scanned,
767                rows_returned,
768                index_predicate_applied,
769                index_predicate_keys_rejected,
770                distinct_keys_deduped,
771            );
772            debug_assert_eq!(
773                execution_trace.keys_scanned,
774                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
775                "execution trace keys_scanned must match rows_scanned metrics input",
776            );
777        }
778    }
779
780    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
781    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
782        plan: &AccessPlannedQuery<E::Key>,
783        cursor_boundary: Option<&CursorBoundary>,
784    ) -> Result<(), InternalError> {
785        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
786            return Ok(());
787        }
788        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
789
790        Ok(())
791    }
792}