Skip to main content

icydb_core/db/executor/load/
mod.rs

1//! Module: executor::load
2//! Responsibility: load-path execution orchestration, pagination, and trace contracts.
3//! Does not own: logical planning semantics or relation/commit mutation policy.
4//! Boundary: consumes executable load plans and delegates post-access semantics to kernel.
5
6mod execute;
7mod fast_stream;
8mod index_range_limit;
9mod page;
10mod pk_stream;
11mod secondary_index;
12mod terminal;
13mod trace;
14
15pub(in crate::db::executor) use self::execute::{
16    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
17};
18
19use self::trace::{access_path_variant, execution_order_direction};
20use crate::{
21    db::{
22        Context, Db, GroupedRow,
23        access::AccessPlan,
24        contracts::canonical_value_compare,
25        cursor::{
26            ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
27            PlannedCursor, decode_pk_cursor_boundary,
28        },
29        data::DataKey,
30        direction::Direction,
31        executor::{
32            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
33            KeyOrderComparator, OrderedKeyStreamBox,
34            aggregate::field::{
35                AggregateFieldValueError, FieldSlot, resolve_any_aggregate_target_slot,
36                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
37            },
38            aggregate::{AggregateOutput, FoldControl, GroupError},
39            group::{
40                CanonicalKey, grouped_budget_observability,
41                grouped_execution_context_from_planner_config,
42            },
43            plan_metrics::{
44                GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
45                record_rows_scanned,
46            },
47            range_token_anchor_key, range_token_from_cursor_anchor,
48            route::aggregate_materialized_fold_direction,
49            validate_executor_plan,
50        },
51        index::IndexCompilePolicy,
52        predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
53        query::plan::{
54            AccessPlannedQuery, GroupHavingSpec, GroupHavingSymbol, LogicalPlan, OrderDirection,
55            grouped_executor_handoff,
56        },
57        response::Response,
58    },
59    error::InternalError,
60    obs::sink::{ExecKind, Span},
61    traits::{EntityKind, EntityValue},
62    value::Value,
63};
64use std::{cmp::Ordering, marker::PhantomData};
65
66///
67/// PageCursor
68///
69/// Internal continuation cursor enum for scalar and grouped pagination.
70///
71#[derive(Clone, Debug, Eq, PartialEq)]
72pub(in crate::db) enum PageCursor {
73    Scalar(ContinuationToken),
74    Grouped(GroupedContinuationToken),
75}
76
77impl PageCursor {
78    /// Borrow scalar continuation token when this cursor is scalar-shaped.
79    #[must_use]
80    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
81        match self {
82            Self::Scalar(token) => Some(token),
83            Self::Grouped(_) => None,
84        }
85    }
86
87    /// Borrow grouped continuation token when this cursor is grouped-shaped.
88    #[must_use]
89    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
90        match self {
91            Self::Scalar(_) => None,
92            Self::Grouped(token) => Some(token),
93        }
94    }
95}
96
97impl From<ContinuationToken> for PageCursor {
98    fn from(value: ContinuationToken) -> Self {
99        Self::Scalar(value)
100    }
101}
102
103impl From<GroupedContinuationToken> for PageCursor {
104    fn from(value: GroupedContinuationToken) -> Self {
105        Self::Grouped(value)
106    }
107}
108
109///
110/// CursorPage
111///
112/// Internal load page result with continuation cursor payload.
113/// Returned by paged executor entrypoints.
114///
115
116#[derive(Debug)]
117pub(crate) struct CursorPage<E: EntityKind> {
118    pub(crate) items: Response<E>,
119
120    pub(crate) next_cursor: Option<PageCursor>,
121}
122
123///
124/// GroupedCursorPage
125///
126/// Internal grouped page result with grouped rows and continuation cursor payload.
127///
128#[derive(Debug)]
129pub(in crate::db) struct GroupedCursorPage {
130    pub(in crate::db) rows: Vec<GroupedRow>,
131    pub(in crate::db) next_cursor: Option<PageCursor>,
132}
133
134///
135/// ExecutionAccessPathVariant
136///
137/// Coarse access path shape used by the load execution trace surface.
138///
139
140#[derive(Clone, Copy, Debug, Eq, PartialEq)]
141pub enum ExecutionAccessPathVariant {
142    ByKey,
143    ByKeys,
144    KeyRange,
145    IndexPrefix,
146    IndexRange,
147    FullScan,
148    Union,
149    Intersection,
150}
151
152///
153/// ExecutionOptimization
154///
155/// Canonical load optimization selected by execution, if any.
156///
157
158#[derive(Clone, Copy, Debug, Eq, PartialEq)]
159pub enum ExecutionOptimization {
160    PrimaryKey,
161    SecondaryOrderPushdown,
162    IndexRangeLimitPushdown,
163}
164
165///
166/// ExecutionTrace
167///
168/// Structured, opt-in load execution introspection snapshot.
169/// Captures plan-shape and execution decisions without changing semantics.
170///
171
172#[derive(Clone, Copy, Debug, Eq, PartialEq)]
173pub struct ExecutionTrace {
174    pub access_path_variant: ExecutionAccessPathVariant,
175    pub direction: OrderDirection,
176    pub optimization: Option<ExecutionOptimization>,
177    pub keys_scanned: u64,
178    pub rows_returned: u64,
179    pub continuation_applied: bool,
180    pub index_predicate_applied: bool,
181    pub index_predicate_keys_rejected: u64,
182    pub distinct_keys_deduped: u64,
183}
184
185impl ExecutionTrace {
186    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
187        Self {
188            access_path_variant: access_path_variant(access),
189            direction: execution_order_direction(direction),
190            optimization: None,
191            keys_scanned: 0,
192            rows_returned: 0,
193            continuation_applied,
194            index_predicate_applied: false,
195            index_predicate_keys_rejected: 0,
196            distinct_keys_deduped: 0,
197        }
198    }
199
200    fn set_path_outcome(
201        &mut self,
202        optimization: Option<ExecutionOptimization>,
203        keys_scanned: usize,
204        rows_returned: usize,
205        index_predicate_applied: bool,
206        index_predicate_keys_rejected: u64,
207        distinct_keys_deduped: u64,
208    ) {
209        self.optimization = optimization;
210        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
211        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
212        self.index_predicate_applied = index_predicate_applied;
213        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
214        self.distinct_keys_deduped = distinct_keys_deduped;
215    }
216}
217
218/// Resolve key-stream comparator contract from runtime direction.
219pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
220    direction: Direction,
221) -> KeyOrderComparator {
222    KeyOrderComparator::from_direction(direction)
223}
224
225///
226/// FastPathKeyResult
227///
228/// Internal fast-path access result.
229/// Carries ordered keys plus observability metadata for shared execution phases.
230///
231
232pub(in crate::db::executor) struct FastPathKeyResult {
233    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
234    pub(in crate::db::executor) rows_scanned: usize,
235    pub(in crate::db::executor) optimization: ExecutionOptimization,
236}
237
238///
239/// LoadExecutor
240///
241/// Load-plan executor with canonical post-access semantics.
242/// Coordinates fast paths, trace hooks, and pagination cursors.
243///
244
245#[derive(Clone)]
246pub(crate) struct LoadExecutor<E: EntityKind> {
247    db: Db<E::Canister>,
248    debug: bool,
249    _marker: PhantomData<E>,
250}
251
252impl<E> LoadExecutor<E>
253where
254    E: EntityKind + EntityValue,
255{
256    /// Construct one load executor bound to a database handle and debug mode.
257    #[must_use]
258    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
259        Self {
260            db,
261            debug,
262            _marker: PhantomData,
263        }
264    }
265
266    /// Recover one canonical read context for kernel-owned execution setup.
267    pub(in crate::db::executor) fn recovered_context(
268        &self,
269    ) -> Result<crate::db::Context<'_, E>, InternalError> {
270        self.db.recovered_context::<E>()
271    }
272
273    // Resolve one orderable aggregate target field into a stable slot with
274    // canonical field-error taxonomy mapping.
275    pub(in crate::db::executor) fn resolve_orderable_field_slot(
276        target_field: &str,
277    ) -> Result<FieldSlot, InternalError> {
278        resolve_orderable_aggregate_target_slot::<E>(target_field)
279            .map_err(AggregateFieldValueError::into_internal_error)
280    }
281
282    // Resolve one aggregate target field into a stable slot with canonical
283    // field-error taxonomy mapping.
284    pub(in crate::db::executor) fn resolve_any_field_slot(
285        target_field: &str,
286    ) -> Result<FieldSlot, InternalError> {
287        resolve_any_aggregate_target_slot::<E>(target_field)
288            .map_err(AggregateFieldValueError::into_internal_error)
289    }
290
291    // Resolve one numeric aggregate target field into a stable slot with
292    // canonical field-error taxonomy mapping.
293    pub(in crate::db::executor) fn resolve_numeric_field_slot(
294        target_field: &str,
295    ) -> Result<FieldSlot, InternalError> {
296        resolve_numeric_aggregate_target_slot::<E>(target_field)
297            .map_err(AggregateFieldValueError::into_internal_error)
298    }
299
300    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
301        self.execute_paged_with_cursor(plan, PlannedCursor::none())
302            .map(|page| page.items)
303    }
304
305    pub(in crate::db) fn execute_paged_with_cursor(
306        &self,
307        plan: ExecutablePlan<E>,
308        cursor: impl Into<PlannedCursor>,
309    ) -> Result<CursorPage<E>, InternalError> {
310        self.execute_paged_with_cursor_traced(plan, cursor)
311            .map(|(page, _)| page)
312    }
313
314    pub(in crate::db) fn execute_paged_with_cursor_traced(
315        &self,
316        plan: ExecutablePlan<E>,
317        cursor: impl Into<PlannedCursor>,
318    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
319        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
320            return Err(InternalError::query_executor_invariant(
321                "grouped plans require execute_grouped pagination entrypoints",
322            ));
323        }
324
325        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
326        let cursor_boundary = cursor.boundary().cloned();
327        let index_range_token = cursor
328            .index_range_anchor()
329            .map(range_token_from_cursor_anchor);
330
331        if !plan.mode().is_load() {
332            return Err(InternalError::query_executor_invariant(
333                "load executor requires load plans",
334            ));
335        }
336
337        let continuation_signature = plan.continuation_signature();
338        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
339        let index_range_specs = plan.index_range_specs()?.to_vec();
340        let route_plan = Self::build_execution_route_plan_for_load(
341            plan.as_inner(),
342            cursor_boundary.as_ref(),
343            index_range_token.as_ref(),
344            None,
345        )?;
346        let continuation_applied = !matches!(
347            route_plan.continuation_mode(),
348            crate::db::executor::route::ContinuationMode::Initial
349        );
350        let direction = route_plan.direction();
351        debug_assert_eq!(
352            route_plan.window().effective_offset,
353            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
354            "route window effective offset must match logical plan offset semantics",
355        );
356        let mut execution_trace = self
357            .debug
358            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
359        let plan = plan.into_inner();
360        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
361
362        let result = (|| {
363            let mut span = Span::<E>::new(ExecKind::Load);
364
365            validate_executor_plan::<E>(&plan)?;
366            let ctx = self.db.recovered_context::<E>()?;
367            let execution_inputs = ExecutionInputs {
368                ctx: &ctx,
369                plan: &plan,
370                stream_bindings: AccessStreamBindings {
371                    index_prefix_specs: index_prefix_specs.as_slice(),
372                    index_range_specs: index_range_specs.as_slice(),
373                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
374                    direction,
375                },
376                execution_preparation: &execution_preparation,
377            };
378
379            record_plan_metrics(&plan.access);
380            // Plan execution routing once, then execute in canonical order.
381            // Resolve one canonical key stream, then run shared page materialization/finalization.
382            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
383                &execution_inputs,
384                &route_plan,
385                cursor_boundary.as_ref(),
386                continuation_signature,
387                IndexCompilePolicy::ConservativeSubset,
388            )?;
389            let page = materialized.page;
390            let rows_scanned = materialized.rows_scanned;
391            let post_access_rows = materialized.post_access_rows;
392            let optimization = materialized.optimization;
393            let index_predicate_applied = materialized.index_predicate_applied;
394            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
395            let distinct_keys_deduped = materialized.distinct_keys_deduped;
396
397            Ok(Self::finalize_execution(
398                page,
399                optimization,
400                rows_scanned,
401                post_access_rows,
402                index_predicate_applied,
403                index_predicate_keys_rejected,
404                distinct_keys_deduped,
405                &mut span,
406                &mut execution_trace,
407            ))
408        })();
409
410        result.map(|page| (page, execution_trace))
411    }
412
413    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
414        &self,
415        plan: ExecutablePlan<E>,
416        cursor: impl Into<GroupedPlannedCursor>,
417    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
418        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
419            return Err(InternalError::query_executor_invariant(
420                "grouped execution requires grouped logical plans",
421            ));
422        }
423
424        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
425
426        self.execute_grouped_path(plan, cursor)
427    }
428
429    // Execute grouped blocking reduction and produce grouped page rows + grouped cursor.
430    #[expect(clippy::too_many_lines)]
431    fn execute_grouped_path(
432        &self,
433        plan: ExecutablePlan<E>,
434        cursor: GroupedPlannedCursor,
435    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
436        validate_executor_plan::<E>(plan.as_inner())?;
437        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
438        let grouped_execution = grouped_handoff.execution();
439        let group_fields = grouped_handoff.group_fields().to_vec();
440        let grouped_having = grouped_handoff.having().cloned();
441        let grouped_route_plan =
442            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
443        let grouped_route_observability =
444            grouped_route_plan.grouped_observability().ok_or_else(|| {
445                InternalError::query_executor_invariant(
446                    "grouped route planning must emit grouped observability payload",
447                )
448            })?;
449        let direction = grouped_route_plan.direction();
450        let continuation_applied = !cursor.is_empty();
451        let mut execution_trace = self
452            .debug
453            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
454        let continuation_signature = plan.continuation_signature();
455        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
456        let index_range_specs = plan.index_range_specs()?.to_vec();
457
458        let mut grouped_execution_context =
459            grouped_execution_context_from_planner_config(Some(grouped_execution));
460        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
461        debug_assert!(
462            grouped_budget.max_groups() >= grouped_budget.groups()
463                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
464                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
465            "grouped budget observability invariants must hold at grouped route entry"
466        );
467
468        // Observe grouped route outcome/rejection once at grouped runtime entry.
469        let grouped_route_outcome = grouped_route_observability.outcome();
470        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
471        let grouped_route_eligible = grouped_route_observability.eligible();
472        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
473        let grouped_plan_metrics_strategy =
474            match grouped_route_observability.grouped_execution_strategy() {
475                crate::db::executor::route::GroupedExecutionStrategy::HashGroup => {
476                    GroupedPlanMetricsStrategy::HashMaterialized
477                }
478                crate::db::executor::route::GroupedExecutionStrategy::OrderedGroup => {
479                    GroupedPlanMetricsStrategy::OrderedStreaming
480                }
481            };
482        debug_assert!(
483            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
484            "grouped route eligibility and rejection reason must stay aligned",
485        );
486        debug_assert!(
487            grouped_route_outcome
488                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
489                || grouped_route_rejection_reason.is_some(),
490            "grouped rejected outcomes must carry a rejection reason",
491        );
492        debug_assert!(
493            matches!(
494                grouped_route_execution_mode,
495                crate::db::executor::route::ExecutionMode::Materialized
496            ),
497            "grouped execution route must remain blocking/materialized",
498        );
499        let mut grouped_engines = grouped_handoff
500            .aggregates()
501            .iter()
502            .map(|aggregate| {
503                if aggregate.target_field().is_some() {
504                    return Err(InternalError::query_executor_invariant(format!(
505                        "grouped field-target aggregate reached executor after planning: {:?}",
506                        aggregate.kind()
507                    )));
508                }
509
510                Ok(grouped_execution_context.create_grouped_engine::<E>(
511                    aggregate.kind(),
512                    aggregate_materialized_fold_direction(aggregate.kind()),
513                ))
514            })
515            .collect::<Result<Vec<_>, _>>()?;
516        let mut short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
517        let plan = plan.into_inner();
518        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
519
520        let mut span = Span::<E>::new(ExecKind::Load);
521        let ctx = self.db.recovered_context::<E>()?;
522        let execution_inputs = ExecutionInputs {
523            ctx: &ctx,
524            plan: &plan,
525            stream_bindings: AccessStreamBindings {
526                index_prefix_specs: index_prefix_specs.as_slice(),
527                index_range_specs: index_range_specs.as_slice(),
528                index_range_anchor: None,
529                direction,
530            },
531            execution_preparation: &execution_preparation,
532        };
533        record_grouped_plan_metrics(&plan.access, grouped_plan_metrics_strategy);
534        let mut resolved = Self::resolve_execution_key_stream_without_distinct(
535            &execution_inputs,
536            &grouped_route_plan,
537            IndexCompilePolicy::ConservativeSubset,
538        )?;
539        let mut scanned_rows = 0usize;
540        let mut filtered_rows = 0usize;
541        let compiled_predicate = execution_preparation.compiled_predicate();
542
543        // Phase 1: stream key->row reads, decode, predicate filtering, and grouped folding.
544        while let Some(key) = resolved.key_stream.next_key()? {
545            let row = match plan.scalar_plan().consistency {
546                MissingRowPolicy::Error => ctx.read_strict(&key),
547                MissingRowPolicy::Ignore => ctx.read(&key),
548            };
549            let row = match row {
550                Ok(row) => row,
551                Err(err) if err.is_not_found() => continue,
552                Err(err) => return Err(err),
553            };
554            scanned_rows = scanned_rows.saturating_add(1);
555            let (id, entity) = Context::<E>::deserialize_row((key, row))?;
556            if let Some(compiled_predicate) = compiled_predicate
557                && !compiled_predicate.eval(&entity)
558            {
559                continue;
560            }
561            filtered_rows = filtered_rows.saturating_add(1);
562
563            let group_values = group_fields
564                .iter()
565                .map(|field| {
566                    entity.get_value_by_index(field.index()).ok_or_else(|| {
567                        InternalError::query_executor_invariant(format!(
568                            "grouped field slot missing on entity: index={}",
569                            field.index()
570                        ))
571                    })
572                })
573                .collect::<Result<Vec<_>, _>>()?;
574            let group_key = Value::List(group_values)
575                .canonical_key()
576                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
577            let canonical_group_value = group_key.canonical_value().clone();
578            let data_key = DataKey::try_new::<E>(id.key())?;
579
580            for (index, engine) in grouped_engines.iter_mut().enumerate() {
581                if short_circuit_keys[index].iter().any(|done| {
582                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
583                }) {
584                    continue;
585                }
586
587                let fold_control = engine
588                    .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
589                    .map_err(Self::map_group_error)?;
590                if matches!(fold_control, FoldControl::Break) {
591                    short_circuit_keys[index].push(canonical_group_value.clone());
592                }
593            }
594        }
595
596        // Phase 2: finalize grouped aggregates per terminal and iterate groups in lock-step.
597        //
598        // This avoids constructing one additional full grouped `(key, aggregates)` buffer
599        // prior to pagination; we page directly while walking finalized grouped outputs.
600        let aggregate_count = grouped_engines.len();
601        if aggregate_count == 0 {
602            return Err(InternalError::query_executor_invariant(
603                "grouped execution requires at least one aggregate terminal",
604            ));
605        }
606        let mut finalized_iters = grouped_engines
607            .into_iter()
608            .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
609            .collect::<Result<Vec<_>, _>>()?;
610        let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
611            InternalError::query_executor_invariant("missing grouped primary iterator")
612        })?;
613
614        // Phase 3: apply grouped resume/offset/limit while finalizing grouped outputs.
615        let initial_offset = plan
616            .scalar_plan()
617            .page
618            .as_ref()
619            .map_or(0, |page| page.offset);
620        let resume_initial_offset = if cursor.is_empty() {
621            initial_offset
622        } else {
623            cursor.initial_offset()
624        };
625        let resume_boundary = cursor
626            .last_group_key()
627            .map(|last_group_key| Value::List(last_group_key.to_vec()));
628        let apply_initial_offset = cursor.is_empty();
629        let limit = plan
630            .scalar_plan()
631            .page
632            .as_ref()
633            .and_then(|page| page.limit)
634            .and_then(|limit| usize::try_from(limit).ok());
635        let initial_offset_for_page = if apply_initial_offset {
636            usize::try_from(initial_offset).unwrap_or(usize::MAX)
637        } else {
638            0
639        };
640        let selection_bound = limit.and_then(|limit| {
641            limit
642                .checked_add(initial_offset_for_page)
643                .and_then(|count| count.checked_add(1))
644        });
645        let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
646        if limit.is_none_or(|limit| limit != 0) {
647            for primary_output in primary_iter.by_ref() {
648                let group_key_value = primary_output.group_key().canonical_value().clone();
649                let mut aggregate_values = Vec::with_capacity(aggregate_count);
650                aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
651                for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
652                    let sibling_output = sibling_iter.next().ok_or_else(|| {
653                        InternalError::query_executor_invariant(format!(
654                            "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
655                        ))
656                    })?;
657                    let sibling_group_key = sibling_output.group_key().canonical_value();
658                    if canonical_value_compare(sibling_group_key, &group_key_value)
659                        != Ordering::Equal
660                    {
661                        return Err(InternalError::query_executor_invariant(format!(
662                            "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
663                        )));
664                    }
665                    aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
666                }
667                debug_assert_eq!(
668                    aggregate_values.len(),
669                    aggregate_count,
670                    "grouped aggregate value alignment must preserve declared aggregate count",
671                );
672                if let Some(grouped_having) = grouped_having.as_ref()
673                    && !Self::group_matches_having(
674                        grouped_having,
675                        group_fields.as_slice(),
676                        &group_key_value,
677                        aggregate_values.as_slice(),
678                    )?
679                {
680                    continue;
681                }
682
683                if let Some(resume_boundary) = resume_boundary.as_ref()
684                    && canonical_value_compare(&group_key_value, resume_boundary)
685                        != Ordering::Greater
686                {
687                    continue;
688                }
689
690                // Keep only the smallest `offset + limit + 1` canonical grouped keys when
691                // paging is bounded so grouped LIMIT does not require one full grouped buffer.
692                if let Some(selection_bound) = selection_bound {
693                    match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
694                        canonical_value_compare(existing_key, &group_key_value)
695                    }) {
696                        Ok(_) => {
697                            return Err(InternalError::query_executor_invariant(format!(
698                                "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
699                            )));
700                        }
701                        Err(insert_index) => {
702                            grouped_candidate_rows
703                                .insert(insert_index, (group_key_value, aggregate_values));
704                            if grouped_candidate_rows.len() > selection_bound {
705                                let _ = grouped_candidate_rows.pop();
706                            }
707                        }
708                    }
709                } else {
710                    grouped_candidate_rows.push((group_key_value, aggregate_values));
711                }
712            }
713            for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
714                if sibling_iter.next().is_some() {
715                    return Err(InternalError::query_executor_invariant(format!(
716                        "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
717                    )));
718                }
719            }
720            if selection_bound.is_none() {
721                grouped_candidate_rows
722                    .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
723            }
724        }
725
726        let mut page_rows = Vec::<GroupedRow>::new();
727        let mut last_emitted_group_key: Option<Vec<Value>> = None;
728        let mut has_more = false;
729        let mut groups_skipped_for_offset = 0usize;
730        for (group_key_value, aggregate_values) in grouped_candidate_rows {
731            if groups_skipped_for_offset < initial_offset_for_page {
732                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
733                continue;
734            }
735            if let Some(limit) = limit
736                && page_rows.len() >= limit
737            {
738                has_more = true;
739                break;
740            }
741
742            let emitted_group_key = match group_key_value {
743                Value::List(values) => values,
744                value => {
745                    return Err(InternalError::query_executor_invariant(format!(
746                        "grouped canonical key must be Value::List, found {value:?}"
747                    )));
748                }
749            };
750            last_emitted_group_key = Some(emitted_group_key.clone());
751            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
752        }
753
754        let next_cursor = if has_more {
755            last_emitted_group_key.map(|last_group_key| {
756                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
757                    continuation_signature,
758                    last_group_key,
759                    Direction::Asc,
760                    resume_initial_offset,
761                ))
762            })
763        } else {
764            None
765        };
766        let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
767        let optimization = resolved.optimization;
768        let index_predicate_applied = resolved.index_predicate_applied;
769        let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
770        let distinct_keys_deduped = resolved
771            .distinct_keys_deduped_counter
772            .as_ref()
773            .map_or(0, |counter| counter.get());
774        let rows_returned = page_rows.len();
775
776        Self::finalize_path_outcome(
777            &mut execution_trace,
778            optimization,
779            rows_scanned,
780            rows_returned,
781            index_predicate_applied,
782            index_predicate_keys_rejected,
783            distinct_keys_deduped,
784        );
785        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
786        debug_assert!(
787            filtered_rows >= rows_returned,
788            "grouped pagination must return at most filtered row cardinality",
789        );
790
791        Ok((
792            GroupedCursorPage {
793                rows: page_rows,
794                next_cursor,
795            },
796            execution_trace,
797        ))
798    }
799
800    // Map grouped reducer errors into executor-owned error classes.
801    fn map_group_error(err: GroupError) -> InternalError {
802        match err {
803            GroupError::MemoryLimitExceeded { .. } => {
804                InternalError::executor_internal(err.to_string())
805            }
806            GroupError::Internal(inner) => inner,
807        }
808    }
809
810    // Convert one aggregate output payload into grouped response value payload.
811    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
812        match output {
813            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
814            AggregateOutput::Exists(value) => Value::Bool(*value),
815            AggregateOutput::Min(value)
816            | AggregateOutput::Max(value)
817            | AggregateOutput::First(value)
818            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
819        }
820    }
821
822    // Evaluate grouped HAVING clauses on one finalized grouped output row.
823    fn group_matches_having(
824        having: &GroupHavingSpec,
825        group_fields: &[crate::db::query::plan::FieldSlot],
826        group_key_value: &Value,
827        aggregate_values: &[Value],
828    ) -> Result<bool, InternalError> {
829        for (index, clause) in having.clauses().iter().enumerate() {
830            let actual = match clause.symbol() {
831                GroupHavingSymbol::GroupField(field_slot) => {
832                    let group_key_list = match group_key_value {
833                        Value::List(values) => values,
834                        value => {
835                            return Err(InternalError::query_executor_invariant(format!(
836                                "grouped HAVING requires list-shaped grouped keys, found {value:?}"
837                            )));
838                        }
839                    };
840                    let Some(group_field_offset) = group_fields
841                        .iter()
842                        .position(|group_field| group_field.index() == field_slot.index())
843                    else {
844                        return Err(InternalError::query_executor_invariant(format!(
845                            "grouped HAVING field is not in grouped key projection: field='{}'",
846                            field_slot.field()
847                        )));
848                    };
849                    group_key_list.get(group_field_offset).ok_or_else(|| {
850                        InternalError::query_executor_invariant(format!(
851                            "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
852                            group_key_list.len()
853                        ))
854                    })?
855                }
856                GroupHavingSymbol::AggregateIndex(aggregate_index) => {
857                    aggregate_values.get(*aggregate_index).ok_or_else(|| {
858                        InternalError::query_executor_invariant(format!(
859                            "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
860                            aggregate_values.len()
861                        ))
862                    })?
863                }
864            };
865
866            if !Self::having_compare_values(actual, clause.op(), clause.value())? {
867                return Ok(false);
868            }
869        }
870
871        Ok(true)
872    }
873
874    // Evaluate one grouped HAVING compare operator using strict value semantics.
875    fn having_compare_values(
876        actual: &Value,
877        op: CompareOp,
878        expected: &Value,
879    ) -> Result<bool, InternalError> {
880        let strict = CoercionSpec::default();
881        let matches = match op {
882            CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
883            CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
884            CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
885            CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
886            CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
887            CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
888            CompareOp::In
889            | CompareOp::NotIn
890            | CompareOp::Contains
891            | CompareOp::StartsWith
892            | CompareOp::EndsWith => {
893                return Err(InternalError::query_executor_invariant(format!(
894                    "unsupported grouped HAVING operator reached executor: {op:?}"
895                )));
896            }
897        };
898
899        Ok(matches)
900    }
901
902    // Record shared observability outcome for any execution path.
903    fn finalize_path_outcome(
904        execution_trace: &mut Option<ExecutionTrace>,
905        optimization: Option<ExecutionOptimization>,
906        rows_scanned: usize,
907        rows_returned: usize,
908        index_predicate_applied: bool,
909        index_predicate_keys_rejected: u64,
910        distinct_keys_deduped: u64,
911    ) {
912        record_rows_scanned::<E>(rows_scanned);
913        if let Some(execution_trace) = execution_trace.as_mut() {
914            execution_trace.set_path_outcome(
915                optimization,
916                rows_scanned,
917                rows_returned,
918                index_predicate_applied,
919                index_predicate_keys_rejected,
920                distinct_keys_deduped,
921            );
922            debug_assert_eq!(
923                execution_trace.keys_scanned,
924                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
925                "execution trace keys_scanned must match rows_scanned metrics input",
926            );
927        }
928    }
929
930    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
931    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
932        plan: &AccessPlannedQuery<E::Key>,
933        cursor_boundary: Option<&CursorBoundary>,
934    ) -> Result<(), InternalError> {
935        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
936            return Ok(());
937        }
938        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
939
940        Ok(())
941    }
942}