Skip to main content

icydb_core/db/executor/load/
mod.rs

1//! Module: executor::load
2//! Responsibility: load-path execution orchestration, pagination, and trace contracts.
3//! Does not own: logical planning semantics or relation/commit mutation policy.
4//! Boundary: consumes executable load plans and delegates post-access semantics to kernel.
5
6mod execute;
7mod fast_stream;
8mod index_range_limit;
9mod page;
10mod pk_stream;
11mod secondary_index;
12mod terminal;
13mod trace;
14
15pub(in crate::db::executor) use self::execute::{
16    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
17};
18
19use self::trace::{access_path_variant, execution_order_direction};
20use crate::{
21    db::{
22        Context, Db, GroupedRow,
23        access::AccessPlan,
24        contracts::canonical_value_compare,
25        cursor::{
26            ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
27            PlannedCursor, decode_pk_cursor_boundary,
28        },
29        data::DataKey,
30        direction::Direction,
31        executor::{
32            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
33            KeyOrderComparator, OrderedKeyStreamBox,
34            aggregate::field::{
35                AggregateFieldValueError, FieldSlot, extract_numeric_field_decimal,
36                extract_orderable_field_value, resolve_any_aggregate_target_slot,
37                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
38            },
39            aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
40            group::{
41                CanonicalKey, GroupKeySet, KeyCanonicalError, grouped_budget_observability,
42                grouped_execution_context_from_planner_config,
43            },
44            plan_metrics::{
45                GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
46                record_rows_scanned,
47            },
48            range_token_anchor_key, range_token_from_cursor_anchor,
49            route::aggregate_materialized_fold_direction,
50            validate_executor_plan,
51        },
52        index::IndexCompilePolicy,
53        predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
54        query::plan::{
55            AccessPlannedQuery, GroupAggregateSpec, GroupHavingSpec, GroupHavingSymbol,
56            LogicalPlan, OrderDirection, grouped_executor_handoff,
57        },
58        response::Response,
59    },
60    error::InternalError,
61    obs::sink::{ExecKind, Span},
62    traits::{EntityKind, EntityValue},
63    types::Decimal,
64    value::Value,
65};
66use std::{cmp::Ordering, marker::PhantomData};
67
68///
69/// PageCursor
70///
71/// Internal continuation cursor enum for scalar and grouped pagination.
72///
73#[derive(Clone, Debug, Eq, PartialEq)]
74pub(in crate::db) enum PageCursor {
75    Scalar(ContinuationToken),
76    Grouped(GroupedContinuationToken),
77}
78
79impl PageCursor {
80    /// Borrow scalar continuation token when this cursor is scalar-shaped.
81    #[must_use]
82    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
83        match self {
84            Self::Scalar(token) => Some(token),
85            Self::Grouped(_) => None,
86        }
87    }
88
89    /// Borrow grouped continuation token when this cursor is grouped-shaped.
90    #[must_use]
91    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
92        match self {
93            Self::Scalar(_) => None,
94            Self::Grouped(token) => Some(token),
95        }
96    }
97}
98
99impl From<ContinuationToken> for PageCursor {
100    fn from(value: ContinuationToken) -> Self {
101        Self::Scalar(value)
102    }
103}
104
105impl From<GroupedContinuationToken> for PageCursor {
106    fn from(value: GroupedContinuationToken) -> Self {
107        Self::Grouped(value)
108    }
109}
110
111///
112/// CursorPage
113///
114/// Internal load page result with continuation cursor payload.
115/// Returned by paged executor entrypoints.
116///
117
118#[derive(Debug)]
119pub(crate) struct CursorPage<E: EntityKind> {
120    pub(crate) items: Response<E>,
121
122    pub(crate) next_cursor: Option<PageCursor>,
123}
124
125///
126/// GroupedCursorPage
127///
128/// Internal grouped page result with grouped rows and continuation cursor payload.
129///
130#[derive(Debug)]
131pub(in crate::db) struct GroupedCursorPage {
132    pub(in crate::db) rows: Vec<GroupedRow>,
133    pub(in crate::db) next_cursor: Option<PageCursor>,
134}
135
136///
137/// ExecutionAccessPathVariant
138///
139/// Coarse access path shape used by the load execution trace surface.
140///
141
142#[derive(Clone, Copy, Debug, Eq, PartialEq)]
143pub enum ExecutionAccessPathVariant {
144    ByKey,
145    ByKeys,
146    KeyRange,
147    IndexPrefix,
148    IndexRange,
149    FullScan,
150    Union,
151    Intersection,
152}
153
154///
155/// ExecutionOptimization
156///
157/// Canonical load optimization selected by execution, if any.
158///
159
160#[derive(Clone, Copy, Debug, Eq, PartialEq)]
161pub enum ExecutionOptimization {
162    PrimaryKey,
163    SecondaryOrderPushdown,
164    IndexRangeLimitPushdown,
165}
166
167///
168/// ExecutionTrace
169///
170/// Structured, opt-in load execution introspection snapshot.
171/// Captures plan-shape and execution decisions without changing semantics.
172///
173
174#[derive(Clone, Copy, Debug, Eq, PartialEq)]
175pub struct ExecutionTrace {
176    pub access_path_variant: ExecutionAccessPathVariant,
177    pub direction: OrderDirection,
178    pub optimization: Option<ExecutionOptimization>,
179    pub keys_scanned: u64,
180    pub rows_returned: u64,
181    pub continuation_applied: bool,
182    pub index_predicate_applied: bool,
183    pub index_predicate_keys_rejected: u64,
184    pub distinct_keys_deduped: u64,
185}
186
187impl ExecutionTrace {
188    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
189        Self {
190            access_path_variant: access_path_variant(access),
191            direction: execution_order_direction(direction),
192            optimization: None,
193            keys_scanned: 0,
194            rows_returned: 0,
195            continuation_applied,
196            index_predicate_applied: false,
197            index_predicate_keys_rejected: 0,
198            distinct_keys_deduped: 0,
199        }
200    }
201
202    fn set_path_outcome(
203        &mut self,
204        optimization: Option<ExecutionOptimization>,
205        keys_scanned: usize,
206        rows_returned: usize,
207        index_predicate_applied: bool,
208        index_predicate_keys_rejected: u64,
209        distinct_keys_deduped: u64,
210    ) {
211        self.optimization = optimization;
212        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
213        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
214        self.index_predicate_applied = index_predicate_applied;
215        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
216        self.distinct_keys_deduped = distinct_keys_deduped;
217    }
218}
219
220/// Resolve key-stream comparator contract from runtime direction.
221pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
222    direction: Direction,
223) -> KeyOrderComparator {
224    KeyOrderComparator::from_direction(direction)
225}
226
227///
228/// FastPathKeyResult
229///
230/// Internal fast-path access result.
231/// Carries ordered keys plus observability metadata for shared execution phases.
232///
233
234pub(in crate::db::executor) struct FastPathKeyResult {
235    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
236    pub(in crate::db::executor) rows_scanned: usize,
237    pub(in crate::db::executor) optimization: ExecutionOptimization,
238}
239
240///
241/// LoadExecutor
242///
243/// Load-plan executor with canonical post-access semantics.
244/// Coordinates fast paths, trace hooks, and pagination cursors.
245///
246
247#[derive(Clone)]
248pub(crate) struct LoadExecutor<E: EntityKind> {
249    db: Db<E::Canister>,
250    debug: bool,
251    _marker: PhantomData<E>,
252}
253
254impl<E> LoadExecutor<E>
255where
256    E: EntityKind + EntityValue,
257{
258    /// Construct one load executor bound to a database handle and debug mode.
259    #[must_use]
260    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
261        Self {
262            db,
263            debug,
264            _marker: PhantomData,
265        }
266    }
267
268    /// Recover one canonical read context for kernel-owned execution setup.
269    pub(in crate::db::executor) fn recovered_context(
270        &self,
271    ) -> Result<crate::db::Context<'_, E>, InternalError> {
272        self.db.recovered_context::<E>()
273    }
274
275    // Resolve one orderable aggregate target field into a stable slot with
276    // canonical field-error taxonomy mapping.
277    pub(in crate::db::executor) fn resolve_orderable_field_slot(
278        target_field: &str,
279    ) -> Result<FieldSlot, InternalError> {
280        resolve_orderable_aggregate_target_slot::<E>(target_field)
281            .map_err(AggregateFieldValueError::into_internal_error)
282    }
283
284    // Resolve one aggregate target field into a stable slot with canonical
285    // field-error taxonomy mapping.
286    pub(in crate::db::executor) fn resolve_any_field_slot(
287        target_field: &str,
288    ) -> Result<FieldSlot, InternalError> {
289        resolve_any_aggregate_target_slot::<E>(target_field)
290            .map_err(AggregateFieldValueError::into_internal_error)
291    }
292
293    // Resolve one numeric aggregate target field into a stable slot with
294    // canonical field-error taxonomy mapping.
295    pub(in crate::db::executor) fn resolve_numeric_field_slot(
296        target_field: &str,
297    ) -> Result<FieldSlot, InternalError> {
298        resolve_numeric_aggregate_target_slot::<E>(target_field)
299            .map_err(AggregateFieldValueError::into_internal_error)
300    }
301
302    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
303        self.execute_paged_with_cursor(plan, PlannedCursor::none())
304            .map(|page| page.items)
305    }
306
307    pub(in crate::db) fn execute_paged_with_cursor(
308        &self,
309        plan: ExecutablePlan<E>,
310        cursor: impl Into<PlannedCursor>,
311    ) -> Result<CursorPage<E>, InternalError> {
312        self.execute_paged_with_cursor_traced(plan, cursor)
313            .map(|(page, _)| page)
314    }
315
316    pub(in crate::db) fn execute_paged_with_cursor_traced(
317        &self,
318        plan: ExecutablePlan<E>,
319        cursor: impl Into<PlannedCursor>,
320    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
321        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
322            return Err(InternalError::query_executor_invariant(
323                "grouped plans require execute_grouped pagination entrypoints",
324            ));
325        }
326
327        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
328        let cursor_boundary = cursor.boundary().cloned();
329        let index_range_token = cursor
330            .index_range_anchor()
331            .map(range_token_from_cursor_anchor);
332
333        if !plan.mode().is_load() {
334            return Err(InternalError::query_executor_invariant(
335                "load executor requires load plans",
336            ));
337        }
338
339        let continuation_signature = plan.continuation_signature();
340        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
341        let index_range_specs = plan.index_range_specs()?.to_vec();
342        let route_plan = Self::build_execution_route_plan_for_load(
343            plan.as_inner(),
344            cursor_boundary.as_ref(),
345            index_range_token.as_ref(),
346            None,
347        )?;
348        let continuation_applied = !matches!(
349            route_plan.continuation_mode(),
350            crate::db::executor::route::ContinuationMode::Initial
351        );
352        let direction = route_plan.direction();
353        debug_assert_eq!(
354            route_plan.window().effective_offset,
355            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
356            "route window effective offset must match logical plan offset semantics",
357        );
358        let mut execution_trace = self
359            .debug
360            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
361        let plan = plan.into_inner();
362        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
363
364        let result = (|| {
365            let mut span = Span::<E>::new(ExecKind::Load);
366
367            validate_executor_plan::<E>(&plan)?;
368            let ctx = self.db.recovered_context::<E>()?;
369            let execution_inputs = ExecutionInputs {
370                ctx: &ctx,
371                plan: &plan,
372                stream_bindings: AccessStreamBindings {
373                    index_prefix_specs: index_prefix_specs.as_slice(),
374                    index_range_specs: index_range_specs.as_slice(),
375                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
376                    direction,
377                },
378                execution_preparation: &execution_preparation,
379            };
380
381            record_plan_metrics(&plan.access);
382            // Plan execution routing once, then execute in canonical order.
383            // Resolve one canonical key stream, then run shared page materialization/finalization.
384            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
385                &execution_inputs,
386                &route_plan,
387                cursor_boundary.as_ref(),
388                continuation_signature,
389                IndexCompilePolicy::ConservativeSubset,
390            )?;
391            let page = materialized.page;
392            let rows_scanned = materialized.rows_scanned;
393            let post_access_rows = materialized.post_access_rows;
394            let optimization = materialized.optimization;
395            let index_predicate_applied = materialized.index_predicate_applied;
396            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
397            let distinct_keys_deduped = materialized.distinct_keys_deduped;
398
399            Ok(Self::finalize_execution(
400                page,
401                optimization,
402                rows_scanned,
403                post_access_rows,
404                index_predicate_applied,
405                index_predicate_keys_rejected,
406                distinct_keys_deduped,
407                &mut span,
408                &mut execution_trace,
409            ))
410        })();
411
412        result.map(|page| (page, execution_trace))
413    }
414
415    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
416        &self,
417        plan: ExecutablePlan<E>,
418        cursor: impl Into<GroupedPlannedCursor>,
419    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
420        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
421            return Err(InternalError::query_executor_invariant(
422                "grouped execution requires grouped logical plans",
423            ));
424        }
425
426        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
427
428        self.execute_grouped_path(plan, cursor)
429    }
430
431    // Execute grouped blocking reduction and produce grouped page rows + grouped cursor.
432    #[expect(clippy::too_many_lines)]
433    fn execute_grouped_path(
434        &self,
435        plan: ExecutablePlan<E>,
436        cursor: GroupedPlannedCursor,
437    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
438        validate_executor_plan::<E>(plan.as_inner())?;
439        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
440        let grouped_execution = grouped_handoff.execution();
441        let group_fields = grouped_handoff.group_fields().to_vec();
442        let grouped_aggregates = grouped_handoff.aggregates().to_vec();
443        let grouped_having = grouped_handoff.having().cloned();
444        let grouped_route_plan =
445            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
446        let grouped_route_observability =
447            grouped_route_plan.grouped_observability().ok_or_else(|| {
448                InternalError::query_executor_invariant(
449                    "grouped route planning must emit grouped observability payload",
450                )
451            })?;
452        let direction = grouped_route_plan.direction();
453        let continuation_applied = !cursor.is_empty();
454        let mut execution_trace = self
455            .debug
456            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
457        let continuation_signature = plan.continuation_signature();
458        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
459        let index_range_specs = plan.index_range_specs()?.to_vec();
460
461        let mut grouped_execution_context =
462            grouped_execution_context_from_planner_config(Some(grouped_execution));
463        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
464        debug_assert!(
465            grouped_budget.max_groups() >= grouped_budget.groups()
466                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
467                && grouped_execution_context
468                    .config()
469                    .max_distinct_values_total()
470                    >= grouped_budget.distinct_values()
471                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
472            "grouped budget observability invariants must hold at grouped route entry"
473        );
474
475        // Observe grouped route outcome/rejection once at grouped runtime entry.
476        let grouped_route_outcome = grouped_route_observability.outcome();
477        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
478        let grouped_route_eligible = grouped_route_observability.eligible();
479        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
480        let grouped_plan_metrics_strategy =
481            match grouped_route_observability.grouped_execution_strategy() {
482                crate::db::executor::route::GroupedExecutionStrategy::HashGroup => {
483                    GroupedPlanMetricsStrategy::HashMaterialized
484                }
485                crate::db::executor::route::GroupedExecutionStrategy::OrderedGroup => {
486                    GroupedPlanMetricsStrategy::OrderedStreaming
487                }
488            };
489        debug_assert!(
490            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
491            "grouped route eligibility and rejection reason must stay aligned",
492        );
493        debug_assert!(
494            grouped_route_outcome
495                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
496                || grouped_route_rejection_reason.is_some(),
497            "grouped rejected outcomes must carry a rejection reason",
498        );
499        debug_assert!(
500            matches!(
501                grouped_route_execution_mode,
502                crate::db::executor::route::ExecutionMode::Materialized
503            ),
504            "grouped execution route must remain blocking/materialized",
505        );
506        let global_distinct_field_aggregate = Self::global_distinct_field_aggregate_spec(
507            group_fields.as_slice(),
508            grouped_aggregates.as_slice(),
509            grouped_having.as_ref(),
510        )?;
511        let (mut grouped_engines, mut short_circuit_keys) =
512            if global_distinct_field_aggregate.is_none() {
513                let grouped_engines = grouped_aggregates
514                .iter()
515                .map(|aggregate| {
516                    if aggregate.target_field().is_some() {
517                        return Err(InternalError::query_executor_invariant(format!(
518                            "grouped field-target aggregate reached executor after planning: {:?}",
519                            aggregate.kind()
520                        )));
521                    }
522
523                    Ok(grouped_execution_context.create_grouped_engine::<E>(
524                        aggregate.kind(),
525                        aggregate_materialized_fold_direction(aggregate.kind()),
526                        aggregate.distinct(),
527                    ))
528                })
529                .collect::<Result<Vec<_>, _>>()?;
530                let short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
531
532                (grouped_engines, short_circuit_keys)
533            } else {
534                (Vec::new(), Vec::new())
535            };
536        let plan = plan.into_inner();
537        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
538
539        let mut span = Span::<E>::new(ExecKind::Load);
540        let ctx = self.db.recovered_context::<E>()?;
541        let execution_inputs = ExecutionInputs {
542            ctx: &ctx,
543            plan: &plan,
544            stream_bindings: AccessStreamBindings {
545                index_prefix_specs: index_prefix_specs.as_slice(),
546                index_range_specs: index_range_specs.as_slice(),
547                index_range_anchor: None,
548                direction,
549            },
550            execution_preparation: &execution_preparation,
551        };
552        record_grouped_plan_metrics(&plan.access, grouped_plan_metrics_strategy);
553        let mut resolved = Self::resolve_execution_key_stream_without_distinct(
554            &execution_inputs,
555            &grouped_route_plan,
556            IndexCompilePolicy::ConservativeSubset,
557        )?;
558        let mut scanned_rows = 0usize;
559        let mut filtered_rows = 0usize;
560        let compiled_predicate = execution_preparation.compiled_predicate();
561
562        if let Some((aggregate_kind, target_field)) = global_distinct_field_aggregate {
563            if !cursor.is_empty() {
564                return Err(InternalError::from_cursor_plan_error(
565                    crate::db::cursor::CursorPlanError::invalid_continuation_cursor_payload(
566                        "global DISTINCT grouped aggregates do not support continuation cursors",
567                    ),
568                ));
569            }
570
571            let global_row = Self::execute_global_distinct_field_aggregate(
572                &plan,
573                &ctx,
574                &mut resolved,
575                compiled_predicate,
576                &mut grouped_execution_context,
577                (aggregate_kind, target_field.as_str()),
578                (&mut scanned_rows, &mut filtered_rows),
579            )?;
580            let page_rows = Self::page_global_distinct_grouped_row(
581                global_row,
582                plan.scalar_plan().page.as_ref(),
583            );
584            let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
585            let optimization = resolved.optimization;
586            let index_predicate_applied = resolved.index_predicate_applied;
587            let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
588            let distinct_keys_deduped = resolved
589                .distinct_keys_deduped_counter
590                .as_ref()
591                .map_or(0, |counter| counter.get());
592            let rows_returned = page_rows.len();
593
594            Self::finalize_path_outcome(
595                &mut execution_trace,
596                optimization,
597                rows_scanned,
598                rows_returned,
599                index_predicate_applied,
600                index_predicate_keys_rejected,
601                distinct_keys_deduped,
602            );
603            span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
604
605            return Ok((
606                GroupedCursorPage {
607                    rows: page_rows,
608                    next_cursor: None,
609                },
610                execution_trace,
611            ));
612        }
613
614        // Phase 1: stream key->row reads, decode, predicate filtering, and grouped folding.
615        while let Some(key) = resolved.key_stream.next_key()? {
616            let row = match plan.scalar_plan().consistency {
617                MissingRowPolicy::Error => ctx.read_strict(&key),
618                MissingRowPolicy::Ignore => ctx.read(&key),
619            };
620            let row = match row {
621                Ok(row) => row,
622                Err(err) if err.is_not_found() => continue,
623                Err(err) => return Err(err),
624            };
625            scanned_rows = scanned_rows.saturating_add(1);
626            let (id, entity) = Context::<E>::deserialize_row((key, row))?;
627            if let Some(compiled_predicate) = compiled_predicate
628                && !compiled_predicate.eval(&entity)
629            {
630                continue;
631            }
632            filtered_rows = filtered_rows.saturating_add(1);
633
634            let group_values = group_fields
635                .iter()
636                .map(|field| {
637                    entity.get_value_by_index(field.index()).ok_or_else(|| {
638                        InternalError::query_executor_invariant(format!(
639                            "grouped field slot missing on entity: index={}",
640                            field.index()
641                        ))
642                    })
643                })
644                .collect::<Result<Vec<_>, _>>()?;
645            let group_key = Value::List(group_values)
646                .canonical_key()
647                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
648            let canonical_group_value = group_key.canonical_value().clone();
649            let data_key = DataKey::try_new::<E>(id.key())?;
650
651            for (index, engine) in grouped_engines.iter_mut().enumerate() {
652                if short_circuit_keys[index].iter().any(|done| {
653                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
654                }) {
655                    continue;
656                }
657
658                let fold_control = engine
659                    .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
660                    .map_err(Self::map_group_error)?;
661                if matches!(fold_control, FoldControl::Break) {
662                    short_circuit_keys[index].push(canonical_group_value.clone());
663                }
664            }
665        }
666
667        // Phase 2: finalize grouped aggregates per terminal and iterate groups in lock-step.
668        //
669        // This avoids constructing one additional full grouped `(key, aggregates)` buffer
670        // prior to pagination; we page directly while walking finalized grouped outputs.
671        let aggregate_count = grouped_engines.len();
672        if aggregate_count == 0 {
673            return Err(InternalError::query_executor_invariant(
674                "grouped execution requires at least one aggregate terminal",
675            ));
676        }
677        let mut finalized_iters = grouped_engines
678            .into_iter()
679            .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
680            .collect::<Result<Vec<_>, _>>()?;
681        let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
682            InternalError::query_executor_invariant("missing grouped primary iterator")
683        })?;
684
685        // Phase 3: apply grouped resume/offset/limit while finalizing grouped outputs.
686        let initial_offset = plan
687            .scalar_plan()
688            .page
689            .as_ref()
690            .map_or(0, |page| page.offset);
691        let resume_initial_offset = if cursor.is_empty() {
692            initial_offset
693        } else {
694            cursor.initial_offset()
695        };
696        let resume_boundary = cursor
697            .last_group_key()
698            .map(|last_group_key| Value::List(last_group_key.to_vec()));
699        let apply_initial_offset = cursor.is_empty();
700        let limit = plan
701            .scalar_plan()
702            .page
703            .as_ref()
704            .and_then(|page| page.limit)
705            .and_then(|limit| usize::try_from(limit).ok());
706        let initial_offset_for_page = if apply_initial_offset {
707            usize::try_from(initial_offset).unwrap_or(usize::MAX)
708        } else {
709            0
710        };
711        let selection_bound = limit.and_then(|limit| {
712            limit
713                .checked_add(initial_offset_for_page)
714                .and_then(|count| count.checked_add(1))
715        });
716        let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
717        if limit.is_none_or(|limit| limit != 0) {
718            for primary_output in primary_iter.by_ref() {
719                let group_key_value = primary_output.group_key().canonical_value().clone();
720                let mut aggregate_values = Vec::with_capacity(aggregate_count);
721                aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
722                for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
723                    let sibling_output = sibling_iter.next().ok_or_else(|| {
724                        InternalError::query_executor_invariant(format!(
725                            "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
726                        ))
727                    })?;
728                    let sibling_group_key = sibling_output.group_key().canonical_value();
729                    if canonical_value_compare(sibling_group_key, &group_key_value)
730                        != Ordering::Equal
731                    {
732                        return Err(InternalError::query_executor_invariant(format!(
733                            "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
734                        )));
735                    }
736                    aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
737                }
738                debug_assert_eq!(
739                    aggregate_values.len(),
740                    aggregate_count,
741                    "grouped aggregate value alignment must preserve declared aggregate count",
742                );
743                if let Some(grouped_having) = grouped_having.as_ref()
744                    && !Self::group_matches_having(
745                        grouped_having,
746                        group_fields.as_slice(),
747                        &group_key_value,
748                        aggregate_values.as_slice(),
749                    )?
750                {
751                    continue;
752                }
753
754                if let Some(resume_boundary) = resume_boundary.as_ref()
755                    && canonical_value_compare(&group_key_value, resume_boundary)
756                        != Ordering::Greater
757                {
758                    continue;
759                }
760
761                // Keep only the smallest `offset + limit + 1` canonical grouped keys when
762                // paging is bounded so grouped LIMIT does not require one full grouped buffer.
763                if let Some(selection_bound) = selection_bound {
764                    match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
765                        canonical_value_compare(existing_key, &group_key_value)
766                    }) {
767                        Ok(_) => {
768                            return Err(InternalError::query_executor_invariant(format!(
769                                "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
770                            )));
771                        }
772                        Err(insert_index) => {
773                            grouped_candidate_rows
774                                .insert(insert_index, (group_key_value, aggregate_values));
775                            if grouped_candidate_rows.len() > selection_bound {
776                                let _ = grouped_candidate_rows.pop();
777                            }
778                        }
779                    }
780                } else {
781                    grouped_candidate_rows.push((group_key_value, aggregate_values));
782                }
783            }
784            for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
785                if sibling_iter.next().is_some() {
786                    return Err(InternalError::query_executor_invariant(format!(
787                        "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
788                    )));
789                }
790            }
791            if selection_bound.is_none() {
792                grouped_candidate_rows
793                    .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
794            }
795        }
796
797        let mut page_rows = Vec::<GroupedRow>::new();
798        let mut last_emitted_group_key: Option<Vec<Value>> = None;
799        let mut has_more = false;
800        let mut groups_skipped_for_offset = 0usize;
801        for (group_key_value, aggregate_values) in grouped_candidate_rows {
802            if groups_skipped_for_offset < initial_offset_for_page {
803                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
804                continue;
805            }
806            if let Some(limit) = limit
807                && page_rows.len() >= limit
808            {
809                has_more = true;
810                break;
811            }
812
813            let emitted_group_key = match group_key_value {
814                Value::List(values) => values,
815                value => {
816                    return Err(InternalError::query_executor_invariant(format!(
817                        "grouped canonical key must be Value::List, found {value:?}"
818                    )));
819                }
820            };
821            last_emitted_group_key = Some(emitted_group_key.clone());
822            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
823        }
824
825        let next_cursor = if has_more {
826            last_emitted_group_key.map(|last_group_key| {
827                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
828                    continuation_signature,
829                    last_group_key,
830                    Direction::Asc,
831                    resume_initial_offset,
832                ))
833            })
834        } else {
835            None
836        };
837        let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
838        let optimization = resolved.optimization;
839        let index_predicate_applied = resolved.index_predicate_applied;
840        let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
841        let distinct_keys_deduped = resolved
842            .distinct_keys_deduped_counter
843            .as_ref()
844            .map_or(0, |counter| counter.get());
845        let rows_returned = page_rows.len();
846
847        Self::finalize_path_outcome(
848            &mut execution_trace,
849            optimization,
850            rows_scanned,
851            rows_returned,
852            index_predicate_applied,
853            index_predicate_keys_rejected,
854            distinct_keys_deduped,
855        );
856        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
857        debug_assert!(
858            filtered_rows >= rows_returned,
859            "grouped pagination must return at most filtered row cardinality",
860        );
861
862        Ok((
863            GroupedCursorPage {
864                rows: page_rows,
865                next_cursor,
866            },
867            execution_trace,
868        ))
869    }
870
871    // Map grouped reducer errors into executor-owned error classes.
872    fn map_group_error(err: GroupError) -> InternalError {
873        match err {
874            GroupError::MemoryLimitExceeded { .. } | GroupError::DistinctBudgetExceeded { .. } => {
875                InternalError::executor_internal(err.to_string())
876            }
877            GroupError::Internal(inner) => inner,
878        }
879    }
880
881    // Resolve whether this grouped shape is the supported global DISTINCT
882    // field-target aggregate contract (`COUNT` or `SUM` with zero group keys).
883    fn global_distinct_field_aggregate_spec(
884        group_fields: &[crate::db::query::plan::FieldSlot],
885        aggregates: &[GroupAggregateSpec],
886        having: Option<&GroupHavingSpec>,
887    ) -> Result<Option<(AggregateKind, String)>, InternalError> {
888        if !group_fields.is_empty() {
889            return Ok(None);
890        }
891        if aggregates.is_empty() {
892            return Ok(None);
893        }
894        if aggregates
895            .iter()
896            .all(|aggregate| aggregate.target_field().is_none())
897        {
898            return Ok(None);
899        }
900        if having.is_some() {
901            return Err(InternalError::query_executor_invariant(
902                "global DISTINCT grouped aggregate shape does not support HAVING",
903            ));
904        }
905        if aggregates.len() != 1 {
906            return Err(InternalError::query_executor_invariant(
907                "global DISTINCT grouped aggregate shape requires exactly one aggregate terminal",
908            ));
909        }
910
911        let aggregate = &aggregates[0];
912        let Some(target_field) = aggregate.target_field() else {
913            return Err(InternalError::query_executor_invariant(
914                "global DISTINCT grouped aggregate shape requires field-target aggregate",
915            ));
916        };
917        if !aggregate.distinct() {
918            return Err(InternalError::query_executor_invariant(
919                "global DISTINCT grouped aggregate shape requires DISTINCT aggregate terminal",
920            ));
921        }
922        if !matches!(aggregate.kind(), AggregateKind::Count | AggregateKind::Sum) {
923            return Err(InternalError::query_executor_invariant(format!(
924                "unsupported global DISTINCT grouped aggregate kind: {:?}",
925                aggregate.kind()
926            )));
927        }
928
929        Ok(Some((aggregate.kind(), target_field.to_string())))
930    }
931
932    // Execute one global DISTINCT field-target grouped aggregate with grouped
933    // distinct budget accounting and deterministic reducer behavior.
934    fn execute_global_distinct_field_aggregate(
935        plan: &AccessPlannedQuery<E::Key>,
936        ctx: &Context<'_, E>,
937        resolved: &mut ResolvedExecutionKeyStream,
938        compiled_predicate: Option<&crate::db::predicate::PredicateProgram>,
939        grouped_execution_context: &mut crate::db::executor::aggregate::ExecutionContext,
940        aggregate_spec: (AggregateKind, &str),
941        row_counters: (&mut usize, &mut usize),
942    ) -> Result<GroupedRow, InternalError> {
943        let (aggregate_kind, target_field) = aggregate_spec;
944        let (scanned_rows, filtered_rows) = row_counters;
945        let field_slot = if aggregate_kind.is_sum() {
946            Self::resolve_numeric_field_slot(target_field)?
947        } else {
948            Self::resolve_any_field_slot(target_field)?
949        };
950        let mut distinct_values = GroupKeySet::new();
951        let mut count = 0u32;
952        let mut sum = Decimal::ZERO;
953        let mut saw_sum_value = false;
954
955        grouped_execution_context
956            .record_implicit_single_group::<E>()
957            .map_err(Self::map_group_error)?;
958
959        while let Some(key) = resolved.key_stream.next_key()? {
960            let row = match plan.scalar_plan().consistency {
961                MissingRowPolicy::Error => ctx.read_strict(&key),
962                MissingRowPolicy::Ignore => ctx.read(&key),
963            };
964            let row = match row {
965                Ok(row) => row,
966                Err(err) if err.is_not_found() => continue,
967                Err(err) => return Err(err),
968            };
969            *scanned_rows = scanned_rows.saturating_add(1);
970            let (_, entity) = Context::<E>::deserialize_row((key, row))?;
971            if let Some(compiled_predicate) = compiled_predicate
972                && !compiled_predicate.eval(&entity)
973            {
974                continue;
975            }
976            *filtered_rows = filtered_rows.saturating_add(1);
977
978            let distinct_value = extract_orderable_field_value(&entity, target_field, field_slot)
979                .map_err(AggregateFieldValueError::into_internal_error)?;
980            let distinct_key = distinct_value
981                .canonical_key()
982                .map_err(KeyCanonicalError::into_internal_error)?;
983            if distinct_values.contains_key(&distinct_key) {
984                continue;
985            }
986
987            let attempted_total = grouped_execution_context
988                .budget()
989                .distinct_values()
990                .saturating_add(1);
991            if attempted_total
992                > grouped_execution_context
993                    .config()
994                    .max_distinct_values_total()
995            {
996                return Err(Self::map_group_error(GroupError::DistinctBudgetExceeded {
997                    resource: "distinct_values_total",
998                    attempted: attempted_total,
999                    limit: grouped_execution_context
1000                        .config()
1001                        .max_distinct_values_total(),
1002                }));
1003            }
1004
1005            let attempted_per_group = u64::try_from(distinct_values.len())
1006                .unwrap_or(u64::MAX)
1007                .saturating_add(1);
1008            if attempted_per_group
1009                > grouped_execution_context
1010                    .config()
1011                    .max_distinct_values_per_group()
1012            {
1013                return Err(Self::map_group_error(GroupError::DistinctBudgetExceeded {
1014                    resource: "distinct_values_per_group",
1015                    attempted: attempted_per_group,
1016                    limit: grouped_execution_context
1017                        .config()
1018                        .max_distinct_values_per_group(),
1019                }));
1020            }
1021
1022            let inserted = distinct_values.insert_key(distinct_key);
1023            debug_assert!(inserted, "new global distinct key must insert exactly once");
1024            grouped_execution_context
1025                .record_distinct_value()
1026                .map_err(Self::map_group_error)?;
1027
1028            if aggregate_kind.is_sum() {
1029                let numeric_value =
1030                    extract_numeric_field_decimal(&entity, target_field, field_slot)
1031                        .map_err(AggregateFieldValueError::into_internal_error)?;
1032                sum += numeric_value;
1033                saw_sum_value = true;
1034            } else {
1035                count = count.saturating_add(1);
1036            }
1037        }
1038
1039        let aggregate_value = if aggregate_kind.is_sum() {
1040            if saw_sum_value {
1041                Value::Decimal(sum)
1042            } else {
1043                Value::Null
1044            }
1045        } else {
1046            Value::Uint(u64::from(count))
1047        };
1048
1049        Ok(GroupedRow::new(Vec::new(), vec![aggregate_value]))
1050    }
1051
1052    // Apply grouped pagination semantics to the singleton global grouped row.
1053    fn page_global_distinct_grouped_row(
1054        row: GroupedRow,
1055        page: Option<&crate::db::query::plan::PageSpec>,
1056    ) -> Vec<GroupedRow> {
1057        let Some(page) = page else {
1058            return vec![row];
1059        };
1060        if page.offset > 0 || page.limit == Some(0) {
1061            return Vec::new();
1062        }
1063
1064        vec![row]
1065    }
1066
1067    // Convert one aggregate output payload into grouped response value payload.
1068    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
1069        match output {
1070            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
1071            AggregateOutput::Sum(value) => value.map_or(Value::Null, Value::Decimal),
1072            AggregateOutput::Exists(value) => Value::Bool(*value),
1073            AggregateOutput::Min(value)
1074            | AggregateOutput::Max(value)
1075            | AggregateOutput::First(value)
1076            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
1077        }
1078    }
1079
1080    // Evaluate grouped HAVING clauses on one finalized grouped output row.
1081    fn group_matches_having(
1082        having: &GroupHavingSpec,
1083        group_fields: &[crate::db::query::plan::FieldSlot],
1084        group_key_value: &Value,
1085        aggregate_values: &[Value],
1086    ) -> Result<bool, InternalError> {
1087        for (index, clause) in having.clauses().iter().enumerate() {
1088            let actual = match clause.symbol() {
1089                GroupHavingSymbol::GroupField(field_slot) => {
1090                    let group_key_list = match group_key_value {
1091                        Value::List(values) => values,
1092                        value => {
1093                            return Err(InternalError::query_executor_invariant(format!(
1094                                "grouped HAVING requires list-shaped grouped keys, found {value:?}"
1095                            )));
1096                        }
1097                    };
1098                    let Some(group_field_offset) = group_fields
1099                        .iter()
1100                        .position(|group_field| group_field.index() == field_slot.index())
1101                    else {
1102                        return Err(InternalError::query_executor_invariant(format!(
1103                            "grouped HAVING field is not in grouped key projection: field='{}'",
1104                            field_slot.field()
1105                        )));
1106                    };
1107                    group_key_list.get(group_field_offset).ok_or_else(|| {
1108                        InternalError::query_executor_invariant(format!(
1109                            "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
1110                            group_key_list.len()
1111                        ))
1112                    })?
1113                }
1114                GroupHavingSymbol::AggregateIndex(aggregate_index) => {
1115                    aggregate_values.get(*aggregate_index).ok_or_else(|| {
1116                        InternalError::query_executor_invariant(format!(
1117                            "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
1118                            aggregate_values.len()
1119                        ))
1120                    })?
1121                }
1122            };
1123
1124            if !Self::having_compare_values(actual, clause.op(), clause.value())? {
1125                return Ok(false);
1126            }
1127        }
1128
1129        Ok(true)
1130    }
1131
1132    // Evaluate one grouped HAVING compare operator using strict value semantics.
1133    fn having_compare_values(
1134        actual: &Value,
1135        op: CompareOp,
1136        expected: &Value,
1137    ) -> Result<bool, InternalError> {
1138        let strict = CoercionSpec::default();
1139        let matches = match op {
1140            CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
1141            CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
1142            CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
1143            CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
1144            CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
1145            CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
1146            CompareOp::In
1147            | CompareOp::NotIn
1148            | CompareOp::Contains
1149            | CompareOp::StartsWith
1150            | CompareOp::EndsWith => {
1151                return Err(InternalError::query_executor_invariant(format!(
1152                    "unsupported grouped HAVING operator reached executor: {op:?}"
1153                )));
1154            }
1155        };
1156
1157        Ok(matches)
1158    }
1159
1160    // Record shared observability outcome for any execution path.
1161    fn finalize_path_outcome(
1162        execution_trace: &mut Option<ExecutionTrace>,
1163        optimization: Option<ExecutionOptimization>,
1164        rows_scanned: usize,
1165        rows_returned: usize,
1166        index_predicate_applied: bool,
1167        index_predicate_keys_rejected: u64,
1168        distinct_keys_deduped: u64,
1169    ) {
1170        record_rows_scanned::<E>(rows_scanned);
1171        if let Some(execution_trace) = execution_trace.as_mut() {
1172            execution_trace.set_path_outcome(
1173                optimization,
1174                rows_scanned,
1175                rows_returned,
1176                index_predicate_applied,
1177                index_predicate_keys_rejected,
1178                distinct_keys_deduped,
1179            );
1180            debug_assert_eq!(
1181                execution_trace.keys_scanned,
1182                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
1183                "execution trace keys_scanned must match rows_scanned metrics input",
1184            );
1185        }
1186    }
1187
1188    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
1189    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
1190        plan: &AccessPlannedQuery<E::Key>,
1191        cursor_boundary: Option<&CursorBoundary>,
1192    ) -> Result<(), InternalError> {
1193        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
1194            return Ok(());
1195        }
1196        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
1197
1198        Ok(())
1199    }
1200}