Skip to main content

icydb_core/db/executor/load/
mod.rs

1//! Module: executor::load
2//! Responsibility: load-path execution orchestration, pagination, and trace contracts.
3//! Does not own: logical planning semantics or relation/commit mutation policy.
4//! Boundary: consumes executable load plans and delegates post-access semantics to kernel.
5#![deny(unreachable_patterns)]
6
7mod execute;
8mod fast_stream;
9mod index_range_limit;
10mod page;
11mod pk_stream;
12mod secondary_index;
13mod terminal;
14mod trace;
15
16pub(in crate::db::executor) use self::execute::{
17    ExecutionInputs, MaterializedExecutionAttempt, ResolvedExecutionKeyStream,
18};
19
20use self::trace::{access_path_variant, execution_order_direction};
21use crate::{
22    db::{
23        Context, Db, GroupedRow,
24        access::AccessPlan,
25        contracts::canonical_value_compare,
26        cursor::{
27            ContinuationToken, CursorBoundary, GroupedContinuationToken, GroupedPlannedCursor,
28            PlannedCursor, decode_pk_cursor_boundary,
29        },
30        data::DataKey,
31        direction::Direction,
32        executor::{
33            AccessStreamBindings, ExecutablePlan, ExecutionKernel, ExecutionPreparation,
34            KeyOrderComparator, OrderedKeyStreamBox,
35            aggregate::field::{
36                AggregateFieldValueError, FieldSlot, extract_numeric_field_decimal,
37                extract_orderable_field_value, resolve_any_aggregate_target_slot,
38                resolve_numeric_aggregate_target_slot, resolve_orderable_aggregate_target_slot,
39            },
40            aggregate::{AggregateKind, AggregateOutput, FoldControl, GroupError},
41            group::{
42                CanonicalKey, GroupKeySet, KeyCanonicalError, grouped_budget_observability,
43                grouped_execution_context_from_planner_config,
44            },
45            plan_metrics::{
46                GroupedPlanMetricsStrategy, record_grouped_plan_metrics, record_plan_metrics,
47                record_rows_scanned,
48            },
49            range_token_anchor_key, range_token_from_cursor_anchor,
50            route::aggregate_materialized_fold_direction,
51            validate_executor_plan,
52        },
53        index::IndexCompilePolicy,
54        predicate::{CoercionSpec, CompareOp, MissingRowPolicy, compare_eq, compare_order},
55        query::plan::{
56            AccessPlannedQuery, GroupAggregateSpec, GroupHavingSpec, GroupHavingSymbol,
57            LogicalPlan, OrderDirection, grouped_executor_handoff,
58        },
59        response::Response,
60    },
61    error::InternalError,
62    obs::sink::{ExecKind, Span},
63    traits::{EntityKind, EntityValue},
64    types::Decimal,
65    value::Value,
66};
67use std::{cmp::Ordering, marker::PhantomData};
68
69///
70/// PageCursor
71///
72/// Internal continuation cursor enum for scalar and grouped pagination.
73///
74#[derive(Clone, Debug, Eq, PartialEq)]
75pub(in crate::db) enum PageCursor {
76    Scalar(ContinuationToken),
77    Grouped(GroupedContinuationToken),
78}
79
80impl PageCursor {
81    /// Borrow scalar continuation token when this cursor is scalar-shaped.
82    #[must_use]
83    pub(in crate::db) const fn as_scalar(&self) -> Option<&ContinuationToken> {
84        match self {
85            Self::Scalar(token) => Some(token),
86            Self::Grouped(_) => None,
87        }
88    }
89
90    /// Borrow grouped continuation token when this cursor is grouped-shaped.
91    #[must_use]
92    pub(in crate::db) const fn as_grouped(&self) -> Option<&GroupedContinuationToken> {
93        match self {
94            Self::Scalar(_) => None,
95            Self::Grouped(token) => Some(token),
96        }
97    }
98}
99
100impl From<ContinuationToken> for PageCursor {
101    fn from(value: ContinuationToken) -> Self {
102        Self::Scalar(value)
103    }
104}
105
106impl From<GroupedContinuationToken> for PageCursor {
107    fn from(value: GroupedContinuationToken) -> Self {
108        Self::Grouped(value)
109    }
110}
111
112///
113/// CursorPage
114///
115/// Internal load page result with continuation cursor payload.
116/// Returned by paged executor entrypoints.
117///
118
119#[derive(Debug)]
120pub(crate) struct CursorPage<E: EntityKind> {
121    pub(crate) items: Response<E>,
122
123    pub(crate) next_cursor: Option<PageCursor>,
124}
125
126///
127/// GroupedCursorPage
128///
129/// Internal grouped page result with grouped rows and continuation cursor payload.
130///
131#[derive(Debug)]
132pub(in crate::db) struct GroupedCursorPage {
133    pub(in crate::db) rows: Vec<GroupedRow>,
134    pub(in crate::db) next_cursor: Option<PageCursor>,
135}
136
137///
138/// ExecutionAccessPathVariant
139///
140/// Coarse access path shape used by the load execution trace surface.
141///
142
143#[derive(Clone, Copy, Debug, Eq, PartialEq)]
144pub enum ExecutionAccessPathVariant {
145    ByKey,
146    ByKeys,
147    KeyRange,
148    IndexPrefix,
149    IndexRange,
150    FullScan,
151    Union,
152    Intersection,
153}
154
155///
156/// ExecutionOptimization
157///
158/// Canonical load optimization selected by execution, if any.
159///
160
161#[derive(Clone, Copy, Debug, Eq, PartialEq)]
162pub enum ExecutionOptimization {
163    PrimaryKey,
164    SecondaryOrderPushdown,
165    IndexRangeLimitPushdown,
166}
167
168///
169/// ExecutionTrace
170///
171/// Structured, opt-in load execution introspection snapshot.
172/// Captures plan-shape and execution decisions without changing semantics.
173///
174
175#[derive(Clone, Copy, Debug, Eq, PartialEq)]
176pub struct ExecutionTrace {
177    pub access_path_variant: ExecutionAccessPathVariant,
178    pub direction: OrderDirection,
179    pub optimization: Option<ExecutionOptimization>,
180    pub keys_scanned: u64,
181    pub rows_returned: u64,
182    pub continuation_applied: bool,
183    pub index_predicate_applied: bool,
184    pub index_predicate_keys_rejected: u64,
185    pub distinct_keys_deduped: u64,
186}
187
188impl ExecutionTrace {
189    fn new<K>(access: &AccessPlan<K>, direction: Direction, continuation_applied: bool) -> Self {
190        Self {
191            access_path_variant: access_path_variant(access),
192            direction: execution_order_direction(direction),
193            optimization: None,
194            keys_scanned: 0,
195            rows_returned: 0,
196            continuation_applied,
197            index_predicate_applied: false,
198            index_predicate_keys_rejected: 0,
199            distinct_keys_deduped: 0,
200        }
201    }
202
203    fn set_path_outcome(
204        &mut self,
205        optimization: Option<ExecutionOptimization>,
206        keys_scanned: usize,
207        rows_returned: usize,
208        index_predicate_applied: bool,
209        index_predicate_keys_rejected: u64,
210        distinct_keys_deduped: u64,
211    ) {
212        self.optimization = optimization;
213        self.keys_scanned = u64::try_from(keys_scanned).unwrap_or(u64::MAX);
214        self.rows_returned = u64::try_from(rows_returned).unwrap_or(u64::MAX);
215        self.index_predicate_applied = index_predicate_applied;
216        self.index_predicate_keys_rejected = index_predicate_keys_rejected;
217        self.distinct_keys_deduped = distinct_keys_deduped;
218    }
219}
220
221/// Resolve key-stream comparator contract from runtime direction.
222pub(in crate::db::executor) const fn key_stream_comparator_from_direction(
223    direction: Direction,
224) -> KeyOrderComparator {
225    KeyOrderComparator::from_direction(direction)
226}
227
228///
229/// FastPathKeyResult
230///
231/// Internal fast-path access result.
232/// Carries ordered keys plus observability metadata for shared execution phases.
233///
234
235pub(in crate::db::executor) struct FastPathKeyResult {
236    pub(in crate::db::executor) ordered_key_stream: OrderedKeyStreamBox,
237    pub(in crate::db::executor) rows_scanned: usize,
238    pub(in crate::db::executor) optimization: ExecutionOptimization,
239}
240
241///
242/// LoadExecutor
243///
244/// Load-plan executor with canonical post-access semantics.
245/// Coordinates fast paths, trace hooks, and pagination cursors.
246///
247
248#[derive(Clone)]
249pub(crate) struct LoadExecutor<E: EntityKind> {
250    db: Db<E::Canister>,
251    debug: bool,
252    _marker: PhantomData<E>,
253}
254
255impl<E> LoadExecutor<E>
256where
257    E: EntityKind + EntityValue,
258{
259    /// Construct one load executor bound to a database handle and debug mode.
260    #[must_use]
261    pub(crate) const fn new(db: Db<E::Canister>, debug: bool) -> Self {
262        Self {
263            db,
264            debug,
265            _marker: PhantomData,
266        }
267    }
268
269    /// Recover one canonical read context for kernel-owned execution setup.
270    pub(in crate::db::executor) fn recovered_context(
271        &self,
272    ) -> Result<crate::db::Context<'_, E>, InternalError> {
273        self.db.recovered_context::<E>()
274    }
275
276    // Resolve one orderable aggregate target field into a stable slot with
277    // canonical field-error taxonomy mapping.
278    pub(in crate::db::executor) fn resolve_orderable_field_slot(
279        target_field: &str,
280    ) -> Result<FieldSlot, InternalError> {
281        resolve_orderable_aggregate_target_slot::<E>(target_field)
282            .map_err(AggregateFieldValueError::into_internal_error)
283    }
284
285    // Resolve one aggregate target field into a stable slot with canonical
286    // field-error taxonomy mapping.
287    pub(in crate::db::executor) fn resolve_any_field_slot(
288        target_field: &str,
289    ) -> Result<FieldSlot, InternalError> {
290        resolve_any_aggregate_target_slot::<E>(target_field)
291            .map_err(AggregateFieldValueError::into_internal_error)
292    }
293
294    // Resolve one numeric aggregate target field into a stable slot with
295    // canonical field-error taxonomy mapping.
296    pub(in crate::db::executor) fn resolve_numeric_field_slot(
297        target_field: &str,
298    ) -> Result<FieldSlot, InternalError> {
299        resolve_numeric_aggregate_target_slot::<E>(target_field)
300            .map_err(AggregateFieldValueError::into_internal_error)
301    }
302
303    pub(crate) fn execute(&self, plan: ExecutablePlan<E>) -> Result<Response<E>, InternalError> {
304        self.execute_paged_with_cursor(plan, PlannedCursor::none())
305            .map(|page| page.items)
306    }
307
308    pub(in crate::db) fn execute_paged_with_cursor(
309        &self,
310        plan: ExecutablePlan<E>,
311        cursor: impl Into<PlannedCursor>,
312    ) -> Result<CursorPage<E>, InternalError> {
313        self.execute_paged_with_cursor_traced(plan, cursor)
314            .map(|(page, _)| page)
315    }
316
317    pub(in crate::db) fn execute_paged_with_cursor_traced(
318        &self,
319        plan: ExecutablePlan<E>,
320        cursor: impl Into<PlannedCursor>,
321    ) -> Result<(CursorPage<E>, Option<ExecutionTrace>), InternalError> {
322        if matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
323            return Err(InternalError::query_executor_invariant(
324                "grouped plans require execute_grouped pagination entrypoints",
325            ));
326        }
327
328        let cursor: PlannedCursor = plan.revalidate_cursor(cursor.into())?;
329        let cursor_boundary = cursor.boundary().cloned();
330        let index_range_token = cursor
331            .index_range_anchor()
332            .map(range_token_from_cursor_anchor);
333
334        if !plan.mode().is_load() {
335            return Err(InternalError::query_executor_invariant(
336                "load executor requires load plans",
337            ));
338        }
339
340        let continuation_signature = plan.continuation_signature();
341        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
342        let index_range_specs = plan.index_range_specs()?.to_vec();
343        let route_plan = Self::build_execution_route_plan_for_load(
344            plan.as_inner(),
345            cursor_boundary.as_ref(),
346            index_range_token.as_ref(),
347            None,
348        )?;
349        let continuation_applied = !matches!(
350            route_plan.continuation_mode(),
351            crate::db::executor::route::ContinuationMode::Initial
352        );
353        let direction = route_plan.direction();
354        debug_assert_eq!(
355            route_plan.window().effective_offset,
356            ExecutionKernel::effective_page_offset(plan.as_inner(), cursor_boundary.as_ref()),
357            "route window effective offset must match logical plan offset semantics",
358        );
359        let mut execution_trace = self
360            .debug
361            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
362        let plan = plan.into_inner();
363        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
364
365        let result = (|| {
366            let mut span = Span::<E>::new(ExecKind::Load);
367
368            validate_executor_plan::<E>(&plan)?;
369            let ctx = self.db.recovered_context::<E>()?;
370            let execution_inputs = ExecutionInputs {
371                ctx: &ctx,
372                plan: &plan,
373                stream_bindings: AccessStreamBindings {
374                    index_prefix_specs: index_prefix_specs.as_slice(),
375                    index_range_specs: index_range_specs.as_slice(),
376                    index_range_anchor: index_range_token.as_ref().map(range_token_anchor_key),
377                    direction,
378                },
379                execution_preparation: &execution_preparation,
380            };
381
382            record_plan_metrics(&plan.access);
383            // Plan execution routing once, then execute in canonical order.
384            // Resolve one canonical key stream, then run shared page materialization/finalization.
385            let materialized = ExecutionKernel::materialize_with_optional_residual_retry(
386                &execution_inputs,
387                &route_plan,
388                cursor_boundary.as_ref(),
389                continuation_signature,
390                IndexCompilePolicy::ConservativeSubset,
391            )?;
392            let page = materialized.page;
393            let rows_scanned = materialized.rows_scanned;
394            let post_access_rows = materialized.post_access_rows;
395            let optimization = materialized.optimization;
396            let index_predicate_applied = materialized.index_predicate_applied;
397            let index_predicate_keys_rejected = materialized.index_predicate_keys_rejected;
398            let distinct_keys_deduped = materialized.distinct_keys_deduped;
399
400            Ok(Self::finalize_execution(
401                page,
402                optimization,
403                rows_scanned,
404                post_access_rows,
405                index_predicate_applied,
406                index_predicate_keys_rejected,
407                distinct_keys_deduped,
408                &mut span,
409                &mut execution_trace,
410            ))
411        })();
412
413        result.map(|page| (page, execution_trace))
414    }
415
416    pub(in crate::db) fn execute_grouped_paged_with_cursor_traced(
417        &self,
418        plan: ExecutablePlan<E>,
419        cursor: impl Into<GroupedPlannedCursor>,
420    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
421        if !matches!(&plan.as_inner().logical, LogicalPlan::Grouped(_)) {
422            return Err(InternalError::query_executor_invariant(
423                "grouped execution requires grouped logical plans",
424            ));
425        }
426
427        let cursor = plan.revalidate_grouped_cursor(cursor.into())?;
428
429        self.execute_grouped_path(plan, cursor)
430    }
431
432    // Execute grouped blocking reduction and produce grouped page rows + grouped cursor.
433    #[expect(clippy::too_many_lines)]
434    fn execute_grouped_path(
435        &self,
436        plan: ExecutablePlan<E>,
437        cursor: GroupedPlannedCursor,
438    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), InternalError> {
439        validate_executor_plan::<E>(plan.as_inner())?;
440        let grouped_handoff = grouped_executor_handoff(plan.as_inner())?;
441        let grouped_execution = grouped_handoff.execution();
442        let group_fields = grouped_handoff.group_fields().to_vec();
443        let grouped_aggregates = grouped_handoff.aggregates().to_vec();
444        let grouped_having = grouped_handoff.having().cloned();
445        let grouped_route_plan =
446            Self::build_execution_route_plan_for_grouped_handoff(grouped_handoff);
447        let grouped_route_observability =
448            grouped_route_plan.grouped_observability().ok_or_else(|| {
449                InternalError::query_executor_invariant(
450                    "grouped route planning must emit grouped observability payload",
451                )
452            })?;
453        let direction = grouped_route_plan.direction();
454        let continuation_applied = !cursor.is_empty();
455        let mut execution_trace = self
456            .debug
457            .then(|| ExecutionTrace::new(plan.access(), direction, continuation_applied));
458        let continuation_signature = plan.continuation_signature();
459        let index_prefix_specs = plan.index_prefix_specs()?.to_vec();
460        let index_range_specs = plan.index_range_specs()?.to_vec();
461
462        let mut grouped_execution_context =
463            grouped_execution_context_from_planner_config(Some(grouped_execution));
464        let max_groups_bound =
465            usize::try_from(grouped_execution_context.config().max_groups()).unwrap_or(usize::MAX);
466        let grouped_budget = grouped_budget_observability(&grouped_execution_context);
467        debug_assert!(
468            grouped_budget.max_groups() >= grouped_budget.groups()
469                && grouped_budget.max_group_bytes() >= grouped_budget.estimated_bytes()
470                && grouped_execution_context
471                    .config()
472                    .max_distinct_values_total()
473                    >= grouped_budget.distinct_values()
474                && grouped_budget.aggregate_states() >= grouped_budget.groups(),
475            "grouped budget observability invariants must hold at grouped route entry"
476        );
477
478        // Observe grouped route outcome/rejection once at grouped runtime entry.
479        let grouped_route_outcome = grouped_route_observability.outcome();
480        let grouped_route_rejection_reason = grouped_route_observability.rejection_reason();
481        let grouped_route_eligible = grouped_route_observability.eligible();
482        let grouped_route_execution_mode = grouped_route_observability.execution_mode();
483        let grouped_plan_metrics_strategy =
484            match grouped_route_observability.grouped_execution_strategy() {
485                crate::db::executor::route::GroupedExecutionStrategy::HashMaterialized => {
486                    GroupedPlanMetricsStrategy::HashMaterialized
487                }
488                crate::db::executor::route::GroupedExecutionStrategy::OrderedMaterialized => {
489                    GroupedPlanMetricsStrategy::OrderedMaterialized
490                }
491            };
492        debug_assert!(
493            grouped_route_eligible == grouped_route_rejection_reason.is_none(),
494            "grouped route eligibility and rejection reason must stay aligned",
495        );
496        debug_assert!(
497            grouped_route_outcome
498                != crate::db::executor::route::GroupedRouteDecisionOutcome::Rejected
499                || grouped_route_rejection_reason.is_some(),
500            "grouped rejected outcomes must carry a rejection reason",
501        );
502        debug_assert!(
503            matches!(
504                grouped_route_execution_mode,
505                crate::db::executor::route::ExecutionMode::Materialized
506            ),
507            "grouped execution route must remain blocking/materialized",
508        );
509        let global_distinct_field_aggregate = Self::global_distinct_field_aggregate_spec(
510            group_fields.as_slice(),
511            grouped_aggregates.as_slice(),
512            grouped_having.as_ref(),
513        )?;
514        let (mut grouped_engines, mut short_circuit_keys) =
515            if global_distinct_field_aggregate.is_none() {
516                let grouped_engines = grouped_aggregates
517                .iter()
518                .map(|aggregate| {
519                    if aggregate.target_field().is_some() {
520                        return Err(InternalError::query_executor_invariant(format!(
521                            "grouped field-target aggregate reached executor after planning: {:?}",
522                            aggregate.kind()
523                        )));
524                    }
525
526                    Ok(grouped_execution_context.create_grouped_engine::<E>(
527                        aggregate.kind(),
528                        aggregate_materialized_fold_direction(aggregate.kind()),
529                        aggregate.distinct(),
530                    ))
531                })
532                .collect::<Result<Vec<_>, _>>()?;
533                let short_circuit_keys = vec![Vec::<Value>::new(); grouped_engines.len()];
534
535                (grouped_engines, short_circuit_keys)
536            } else {
537                (Vec::new(), Vec::new())
538            };
539        let plan = plan.into_inner();
540        let execution_preparation = ExecutionPreparation::for_plan::<E>(&plan);
541
542        let mut span = Span::<E>::new(ExecKind::Load);
543        let ctx = self.db.recovered_context::<E>()?;
544        let execution_inputs = ExecutionInputs {
545            ctx: &ctx,
546            plan: &plan,
547            stream_bindings: AccessStreamBindings {
548                index_prefix_specs: index_prefix_specs.as_slice(),
549                index_range_specs: index_range_specs.as_slice(),
550                index_range_anchor: None,
551                direction,
552            },
553            execution_preparation: &execution_preparation,
554        };
555        record_grouped_plan_metrics(&plan.access, grouped_plan_metrics_strategy);
556        let mut resolved = Self::resolve_execution_key_stream_without_distinct(
557            &execution_inputs,
558            &grouped_route_plan,
559            IndexCompilePolicy::ConservativeSubset,
560        )?;
561        let mut scanned_rows = 0usize;
562        let mut filtered_rows = 0usize;
563        let compiled_predicate = execution_preparation.compiled_predicate();
564
565        if let Some((aggregate_kind, target_field)) = global_distinct_field_aggregate {
566            if !cursor.is_empty() {
567                return Err(InternalError::from_cursor_plan_error(
568                    crate::db::cursor::CursorPlanError::invalid_continuation_cursor_payload(
569                        "global DISTINCT grouped aggregates do not support continuation cursors",
570                    ),
571                ));
572            }
573
574            let global_row = Self::execute_global_distinct_field_aggregate(
575                &plan,
576                &ctx,
577                &mut resolved,
578                compiled_predicate,
579                &mut grouped_execution_context,
580                (aggregate_kind, target_field.as_str()),
581                (&mut scanned_rows, &mut filtered_rows),
582            )?;
583            let page_rows = Self::page_global_distinct_grouped_row(
584                global_row,
585                plan.scalar_plan().page.as_ref(),
586            );
587            let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
588            let optimization = resolved.optimization;
589            let index_predicate_applied = resolved.index_predicate_applied;
590            let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
591            let distinct_keys_deduped = resolved
592                .distinct_keys_deduped_counter
593                .as_ref()
594                .map_or(0, |counter| counter.get());
595            let rows_returned = page_rows.len();
596
597            Self::finalize_path_outcome(
598                &mut execution_trace,
599                optimization,
600                rows_scanned,
601                rows_returned,
602                index_predicate_applied,
603                index_predicate_keys_rejected,
604                distinct_keys_deduped,
605            );
606            span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
607
608            return Ok((
609                GroupedCursorPage {
610                    rows: page_rows,
611                    next_cursor: None,
612                },
613                execution_trace,
614            ));
615        }
616
617        // Phase 1: stream key->row reads, decode, predicate filtering, and grouped folding.
618        while let Some(key) = resolved.key_stream.next_key()? {
619            let row = match plan.scalar_plan().consistency {
620                MissingRowPolicy::Error => ctx.read_strict(&key),
621                MissingRowPolicy::Ignore => ctx.read(&key),
622            };
623            let row = match row {
624                Ok(row) => row,
625                Err(err) if err.is_not_found() => continue,
626                Err(err) => return Err(err),
627            };
628            scanned_rows = scanned_rows.saturating_add(1);
629            let (id, entity) = Context::<E>::deserialize_row((key, row))?;
630            if let Some(compiled_predicate) = compiled_predicate
631                && !compiled_predicate.eval(&entity)
632            {
633                continue;
634            }
635            filtered_rows = filtered_rows.saturating_add(1);
636
637            let group_values = group_fields
638                .iter()
639                .map(|field| {
640                    entity.get_value_by_index(field.index()).ok_or_else(|| {
641                        InternalError::query_executor_invariant(format!(
642                            "grouped field slot missing on entity: index={}",
643                            field.index()
644                        ))
645                    })
646                })
647                .collect::<Result<Vec<_>, _>>()?;
648            let group_key = Value::List(group_values)
649                .canonical_key()
650                .map_err(crate::db::executor::group::KeyCanonicalError::into_internal_error)?;
651            let canonical_group_value = group_key.canonical_value().clone();
652            let data_key = DataKey::try_new::<E>(id.key())?;
653
654            for (index, engine) in grouped_engines.iter_mut().enumerate() {
655                if short_circuit_keys[index].iter().any(|done| {
656                    canonical_value_compare(done, &canonical_group_value) == Ordering::Equal
657                }) {
658                    continue;
659                }
660
661                let fold_control = engine
662                    .ingest_grouped(group_key.clone(), &data_key, &mut grouped_execution_context)
663                    .map_err(Self::map_group_error)?;
664                if matches!(fold_control, FoldControl::Break) {
665                    short_circuit_keys[index].push(canonical_group_value.clone());
666                    debug_assert!(
667                        short_circuit_keys[index].len() <= max_groups_bound,
668                        "grouped short-circuit key tracking must stay bounded by max_groups",
669                    );
670                }
671            }
672        }
673
674        // Phase 2: finalize grouped aggregates per terminal and iterate groups in lock-step.
675        //
676        // This avoids constructing one additional full grouped `(key, aggregates)` buffer
677        // prior to pagination; we page directly while walking finalized grouped outputs.
678        let aggregate_count = grouped_engines.len();
679        if aggregate_count == 0 {
680            return Err(InternalError::query_executor_invariant(
681                "grouped execution requires at least one aggregate terminal",
682            ));
683        }
684        let mut finalized_iters = grouped_engines
685            .into_iter()
686            .map(|engine| engine.finalize_grouped().map(Vec::into_iter))
687            .collect::<Result<Vec<_>, _>>()?;
688        let mut primary_iter = finalized_iters.drain(..1).next().ok_or_else(|| {
689            InternalError::query_executor_invariant("missing grouped primary iterator")
690        })?;
691
692        // Phase 3: apply grouped resume/offset/limit while finalizing grouped outputs.
693        let initial_offset = plan
694            .scalar_plan()
695            .page
696            .as_ref()
697            .map_or(0, |page| page.offset);
698        let resume_initial_offset = if cursor.is_empty() {
699            initial_offset
700        } else {
701            cursor.initial_offset()
702        };
703        let resume_boundary = cursor
704            .last_group_key()
705            .map(|last_group_key| Value::List(last_group_key.to_vec()));
706        let apply_initial_offset = cursor.is_empty();
707        let limit = plan
708            .scalar_plan()
709            .page
710            .as_ref()
711            .and_then(|page| page.limit)
712            .and_then(|limit| usize::try_from(limit).ok());
713        let initial_offset_for_page = if apply_initial_offset {
714            usize::try_from(initial_offset).unwrap_or(usize::MAX)
715        } else {
716            0
717        };
718        let selection_bound = limit.and_then(|limit| {
719            limit
720                .checked_add(initial_offset_for_page)
721                .and_then(|count| count.checked_add(1))
722        });
723        let mut grouped_candidate_rows = Vec::<(Value, Vec<Value>)>::new();
724        if limit.is_none_or(|limit| limit != 0) {
725            for primary_output in primary_iter.by_ref() {
726                let group_key_value = primary_output.group_key().canonical_value().clone();
727                let mut aggregate_values = Vec::with_capacity(aggregate_count);
728                aggregate_values.push(Self::aggregate_output_to_value(primary_output.output()));
729                for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
730                    let sibling_output = sibling_iter.next().ok_or_else(|| {
731                        InternalError::query_executor_invariant(format!(
732                            "grouped finalize alignment missing sibling aggregate row: sibling_index={sibling_index}"
733                        ))
734                    })?;
735                    let sibling_group_key = sibling_output.group_key().canonical_value();
736                    if canonical_value_compare(sibling_group_key, &group_key_value)
737                        != Ordering::Equal
738                    {
739                        return Err(InternalError::query_executor_invariant(format!(
740                            "grouped finalize alignment mismatch at sibling_index={sibling_index}: primary_key={group_key_value:?}, sibling_key={sibling_group_key:?}"
741                        )));
742                    }
743                    aggregate_values.push(Self::aggregate_output_to_value(sibling_output.output()));
744                }
745                debug_assert_eq!(
746                    aggregate_values.len(),
747                    aggregate_count,
748                    "grouped aggregate value alignment must preserve declared aggregate count",
749                );
750                if let Some(grouped_having) = grouped_having.as_ref()
751                    && !Self::group_matches_having(
752                        grouped_having,
753                        group_fields.as_slice(),
754                        &group_key_value,
755                        aggregate_values.as_slice(),
756                    )?
757                {
758                    continue;
759                }
760
761                if let Some(resume_boundary) = resume_boundary.as_ref()
762                    && canonical_value_compare(&group_key_value, resume_boundary)
763                        != Ordering::Greater
764                {
765                    continue;
766                }
767
768                // Keep only the smallest `offset + limit + 1` canonical grouped keys when
769                // paging is bounded so grouped LIMIT does not require one full grouped buffer.
770                if let Some(selection_bound) = selection_bound {
771                    match grouped_candidate_rows.binary_search_by(|(existing_key, _)| {
772                        canonical_value_compare(existing_key, &group_key_value)
773                    }) {
774                        Ok(_) => {
775                            return Err(InternalError::query_executor_invariant(format!(
776                                "grouped finalize produced duplicate canonical group key: {group_key_value:?}"
777                            )));
778                        }
779                        Err(insert_index) => {
780                            grouped_candidate_rows
781                                .insert(insert_index, (group_key_value, aggregate_values));
782                            if grouped_candidate_rows.len() > selection_bound {
783                                let _ = grouped_candidate_rows.pop();
784                            }
785                            debug_assert!(
786                                grouped_candidate_rows.len() <= selection_bound,
787                                "bounded grouped candidate rows must stay <= selection_bound",
788                            );
789                        }
790                    }
791                } else {
792                    grouped_candidate_rows.push((group_key_value, aggregate_values));
793                    debug_assert!(
794                        grouped_candidate_rows.len() <= max_groups_bound,
795                        "grouped candidate rows must stay bounded by max_groups",
796                    );
797                }
798            }
799            for (sibling_index, sibling_iter) in finalized_iters.iter_mut().enumerate() {
800                if sibling_iter.next().is_some() {
801                    return Err(InternalError::query_executor_invariant(format!(
802                        "grouped finalize alignment has trailing sibling rows: sibling_index={sibling_index}"
803                    )));
804                }
805            }
806            if selection_bound.is_none() {
807                grouped_candidate_rows
808                    .sort_by(|(left, _), (right, _)| canonical_value_compare(left, right));
809            }
810        }
811        if let Some(selection_bound) = selection_bound {
812            debug_assert!(
813                grouped_candidate_rows.len() <= selection_bound,
814                "grouped candidate rows must remain bounded by selection_bound",
815            );
816        } else {
817            debug_assert!(
818                grouped_candidate_rows.len() <= max_groups_bound,
819                "grouped candidate rows must remain bounded by max_groups",
820            );
821        }
822
823        let mut page_rows = Vec::<GroupedRow>::new();
824        let mut last_emitted_group_key: Option<Vec<Value>> = None;
825        let mut has_more = false;
826        let mut groups_skipped_for_offset = 0usize;
827        for (group_key_value, aggregate_values) in grouped_candidate_rows {
828            if groups_skipped_for_offset < initial_offset_for_page {
829                groups_skipped_for_offset = groups_skipped_for_offset.saturating_add(1);
830                continue;
831            }
832            if let Some(limit) = limit
833                && page_rows.len() >= limit
834            {
835                has_more = true;
836                break;
837            }
838
839            let emitted_group_key = match group_key_value {
840                Value::List(values) => values,
841                value => {
842                    return Err(InternalError::query_executor_invariant(format!(
843                        "grouped canonical key must be Value::List, found {value:?}"
844                    )));
845                }
846            };
847            last_emitted_group_key = Some(emitted_group_key.clone());
848            page_rows.push(GroupedRow::new(emitted_group_key, aggregate_values));
849            debug_assert!(
850                limit.is_none_or(|bounded_limit| page_rows.len() <= bounded_limit),
851                "grouped page rows must not exceed explicit page limit",
852            );
853        }
854
855        let next_cursor = if has_more {
856            last_emitted_group_key.map(|last_group_key| {
857                PageCursor::Grouped(GroupedContinuationToken::new_with_direction(
858                    continuation_signature,
859                    last_group_key,
860                    Direction::Asc,
861                    resume_initial_offset,
862                ))
863            })
864        } else {
865            None
866        };
867        let rows_scanned = resolved.rows_scanned_override.unwrap_or(scanned_rows);
868        let optimization = resolved.optimization;
869        let index_predicate_applied = resolved.index_predicate_applied;
870        let index_predicate_keys_rejected = resolved.index_predicate_keys_rejected;
871        let distinct_keys_deduped = resolved
872            .distinct_keys_deduped_counter
873            .as_ref()
874            .map_or(0, |counter| counter.get());
875        let rows_returned = page_rows.len();
876
877        Self::finalize_path_outcome(
878            &mut execution_trace,
879            optimization,
880            rows_scanned,
881            rows_returned,
882            index_predicate_applied,
883            index_predicate_keys_rejected,
884            distinct_keys_deduped,
885        );
886        span.set_rows(u64::try_from(rows_returned).unwrap_or(u64::MAX));
887        debug_assert!(
888            filtered_rows >= rows_returned,
889            "grouped pagination must return at most filtered row cardinality",
890        );
891
892        Ok((
893            GroupedCursorPage {
894                rows: page_rows,
895                next_cursor,
896            },
897            execution_trace,
898        ))
899    }
900
901    // Map grouped reducer errors into executor-owned error classes.
902    fn map_group_error(err: GroupError) -> InternalError {
903        match err {
904            GroupError::MemoryLimitExceeded { .. } | GroupError::DistinctBudgetExceeded { .. } => {
905                InternalError::executor_internal(err.to_string())
906            }
907            GroupError::Internal(inner) => inner,
908        }
909    }
910
911    // Resolve whether this grouped shape is the supported global DISTINCT
912    // field-target aggregate contract (`COUNT` or `SUM` with zero group keys).
913    fn global_distinct_field_aggregate_spec(
914        group_fields: &[crate::db::query::plan::FieldSlot],
915        aggregates: &[GroupAggregateSpec],
916        having: Option<&GroupHavingSpec>,
917    ) -> Result<Option<(AggregateKind, String)>, InternalError> {
918        if !group_fields.is_empty() {
919            return Ok(None);
920        }
921        if aggregates.is_empty() {
922            return Ok(None);
923        }
924        if aggregates
925            .iter()
926            .all(|aggregate| aggregate.target_field().is_none())
927        {
928            return Ok(None);
929        }
930        if having.is_some() {
931            return Err(InternalError::query_executor_invariant(
932                "global DISTINCT grouped aggregate shape does not support HAVING",
933            ));
934        }
935        if aggregates.len() != 1 {
936            return Err(InternalError::query_executor_invariant(
937                "global DISTINCT grouped aggregate shape requires exactly one aggregate terminal",
938            ));
939        }
940
941        let aggregate = &aggregates[0];
942        let Some(target_field) = aggregate.target_field() else {
943            return Err(InternalError::query_executor_invariant(
944                "global DISTINCT grouped aggregate shape requires field-target aggregate",
945            ));
946        };
947        if !aggregate.distinct() {
948            return Err(InternalError::query_executor_invariant(
949                "global DISTINCT grouped aggregate shape requires DISTINCT aggregate terminal",
950            ));
951        }
952        if !matches!(aggregate.kind(), AggregateKind::Count | AggregateKind::Sum) {
953            return Err(InternalError::query_executor_invariant(format!(
954                "unsupported global DISTINCT grouped aggregate kind: {:?}",
955                aggregate.kind()
956            )));
957        }
958
959        Ok(Some((aggregate.kind(), target_field.to_string())))
960    }
961
962    // Execute one global DISTINCT field-target grouped aggregate with grouped
963    // distinct budget accounting and deterministic reducer behavior.
964    fn execute_global_distinct_field_aggregate(
965        plan: &AccessPlannedQuery<E::Key>,
966        ctx: &Context<'_, E>,
967        resolved: &mut ResolvedExecutionKeyStream,
968        compiled_predicate: Option<&crate::db::predicate::PredicateProgram>,
969        grouped_execution_context: &mut crate::db::executor::aggregate::ExecutionContext,
970        aggregate_spec: (AggregateKind, &str),
971        row_counters: (&mut usize, &mut usize),
972    ) -> Result<GroupedRow, InternalError> {
973        let (aggregate_kind, target_field) = aggregate_spec;
974        let (scanned_rows, filtered_rows) = row_counters;
975        let field_slot = if aggregate_kind.is_sum() {
976            Self::resolve_numeric_field_slot(target_field)?
977        } else {
978            Self::resolve_any_field_slot(target_field)?
979        };
980        let mut distinct_values = GroupKeySet::new();
981        let mut count = 0u32;
982        let mut sum = Decimal::ZERO;
983        let mut saw_sum_value = false;
984
985        grouped_execution_context
986            .record_implicit_single_group::<E>()
987            .map_err(Self::map_group_error)?;
988
989        while let Some(key) = resolved.key_stream.next_key()? {
990            let row = match plan.scalar_plan().consistency {
991                MissingRowPolicy::Error => ctx.read_strict(&key),
992                MissingRowPolicy::Ignore => ctx.read(&key),
993            };
994            let row = match row {
995                Ok(row) => row,
996                Err(err) if err.is_not_found() => continue,
997                Err(err) => return Err(err),
998            };
999            *scanned_rows = scanned_rows.saturating_add(1);
1000            let (_, entity) = Context::<E>::deserialize_row((key, row))?;
1001            if let Some(compiled_predicate) = compiled_predicate
1002                && !compiled_predicate.eval(&entity)
1003            {
1004                continue;
1005            }
1006            *filtered_rows = filtered_rows.saturating_add(1);
1007
1008            let distinct_value = extract_orderable_field_value(&entity, target_field, field_slot)
1009                .map_err(AggregateFieldValueError::into_internal_error)?;
1010            let distinct_key = distinct_value
1011                .canonical_key()
1012                .map_err(KeyCanonicalError::into_internal_error)?;
1013            let distinct_admitted = grouped_execution_context
1014                .admit_distinct_key(
1015                    &mut distinct_values,
1016                    grouped_execution_context
1017                        .config()
1018                        .max_distinct_values_per_group(),
1019                    distinct_key,
1020                )
1021                .map_err(Self::map_group_error)?;
1022            if !distinct_admitted {
1023                continue;
1024            }
1025
1026            if aggregate_kind.is_sum() {
1027                let numeric_value =
1028                    extract_numeric_field_decimal(&entity, target_field, field_slot)
1029                        .map_err(AggregateFieldValueError::into_internal_error)?;
1030                sum += numeric_value;
1031                saw_sum_value = true;
1032            } else {
1033                count = count.saturating_add(1);
1034            }
1035        }
1036
1037        let aggregate_value = if aggregate_kind.is_sum() {
1038            if saw_sum_value {
1039                Value::Decimal(sum)
1040            } else {
1041                Value::Null
1042            }
1043        } else {
1044            Value::Uint(u64::from(count))
1045        };
1046
1047        Ok(GroupedRow::new(Vec::new(), vec![aggregate_value]))
1048    }
1049
1050    // Apply grouped pagination semantics to the singleton global grouped row.
1051    fn page_global_distinct_grouped_row(
1052        row: GroupedRow,
1053        page: Option<&crate::db::query::plan::PageSpec>,
1054    ) -> Vec<GroupedRow> {
1055        let Some(page) = page else {
1056            return vec![row];
1057        };
1058        if page.offset > 0 || page.limit == Some(0) {
1059            return Vec::new();
1060        }
1061
1062        vec![row]
1063    }
1064
1065    // Convert one aggregate output payload into grouped response value payload.
1066    fn aggregate_output_to_value(output: &AggregateOutput<E>) -> Value {
1067        match output {
1068            AggregateOutput::Count(value) => Value::Uint(u64::from(*value)),
1069            AggregateOutput::Sum(value) => value.map_or(Value::Null, Value::Decimal),
1070            AggregateOutput::Exists(value) => Value::Bool(*value),
1071            AggregateOutput::Min(value)
1072            | AggregateOutput::Max(value)
1073            | AggregateOutput::First(value)
1074            | AggregateOutput::Last(value) => value.map_or(Value::Null, Value::from),
1075        }
1076    }
1077
1078    // Evaluate grouped HAVING clauses on one finalized grouped output row.
1079    fn group_matches_having(
1080        having: &GroupHavingSpec,
1081        group_fields: &[crate::db::query::plan::FieldSlot],
1082        group_key_value: &Value,
1083        aggregate_values: &[Value],
1084    ) -> Result<bool, InternalError> {
1085        for (index, clause) in having.clauses().iter().enumerate() {
1086            let actual = match clause.symbol() {
1087                GroupHavingSymbol::GroupField(field_slot) => {
1088                    let group_key_list = match group_key_value {
1089                        Value::List(values) => values,
1090                        value => {
1091                            return Err(InternalError::query_executor_invariant(format!(
1092                                "grouped HAVING requires list-shaped grouped keys, found {value:?}"
1093                            )));
1094                        }
1095                    };
1096                    let Some(group_field_offset) = group_fields
1097                        .iter()
1098                        .position(|group_field| group_field.index() == field_slot.index())
1099                    else {
1100                        return Err(InternalError::query_executor_invariant(format!(
1101                            "grouped HAVING field is not in grouped key projection: field='{}'",
1102                            field_slot.field()
1103                        )));
1104                    };
1105                    group_key_list.get(group_field_offset).ok_or_else(|| {
1106                        InternalError::query_executor_invariant(format!(
1107                            "grouped HAVING group key offset out of bounds: clause_index={index}, offset={group_field_offset}, key_len={}",
1108                            group_key_list.len()
1109                        ))
1110                    })?
1111                }
1112                GroupHavingSymbol::AggregateIndex(aggregate_index) => {
1113                    aggregate_values.get(*aggregate_index).ok_or_else(|| {
1114                        InternalError::query_executor_invariant(format!(
1115                            "grouped HAVING aggregate index out of bounds: clause_index={index}, aggregate_index={aggregate_index}, aggregate_count={}",
1116                            aggregate_values.len()
1117                        ))
1118                    })?
1119                }
1120            };
1121
1122            if !Self::having_compare_values(actual, clause.op(), clause.value())? {
1123                return Ok(false);
1124            }
1125        }
1126
1127        Ok(true)
1128    }
1129
1130    // Evaluate one grouped HAVING compare operator using strict value semantics.
1131    fn having_compare_values(
1132        actual: &Value,
1133        op: CompareOp,
1134        expected: &Value,
1135    ) -> Result<bool, InternalError> {
1136        let strict = CoercionSpec::default();
1137        let matches = match op {
1138            CompareOp::Eq => compare_eq(actual, expected, &strict).unwrap_or(false),
1139            CompareOp::Ne => compare_eq(actual, expected, &strict).is_some_and(|equal| !equal),
1140            CompareOp::Lt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_lt),
1141            CompareOp::Lte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_le),
1142            CompareOp::Gt => compare_order(actual, expected, &strict).is_some_and(Ordering::is_gt),
1143            CompareOp::Gte => compare_order(actual, expected, &strict).is_some_and(Ordering::is_ge),
1144            CompareOp::In
1145            | CompareOp::NotIn
1146            | CompareOp::Contains
1147            | CompareOp::StartsWith
1148            | CompareOp::EndsWith => {
1149                return Err(InternalError::query_executor_invariant(format!(
1150                    "unsupported grouped HAVING operator reached executor: {op:?}"
1151                )));
1152            }
1153        };
1154
1155        Ok(matches)
1156    }
1157
1158    // Record shared observability outcome for any execution path.
1159    fn finalize_path_outcome(
1160        execution_trace: &mut Option<ExecutionTrace>,
1161        optimization: Option<ExecutionOptimization>,
1162        rows_scanned: usize,
1163        rows_returned: usize,
1164        index_predicate_applied: bool,
1165        index_predicate_keys_rejected: u64,
1166        distinct_keys_deduped: u64,
1167    ) {
1168        record_rows_scanned::<E>(rows_scanned);
1169        if let Some(execution_trace) = execution_trace.as_mut() {
1170            execution_trace.set_path_outcome(
1171                optimization,
1172                rows_scanned,
1173                rows_returned,
1174                index_predicate_applied,
1175                index_predicate_keys_rejected,
1176                distinct_keys_deduped,
1177            );
1178            debug_assert_eq!(
1179                execution_trace.keys_scanned,
1180                u64::try_from(rows_scanned).unwrap_or(u64::MAX),
1181                "execution trace keys_scanned must match rows_scanned metrics input",
1182            );
1183        }
1184    }
1185
1186    // Preserve PK fast-path cursor-boundary error classification at the executor boundary.
1187    pub(in crate::db::executor) fn validate_pk_fast_path_boundary_if_applicable(
1188        plan: &AccessPlannedQuery<E::Key>,
1189        cursor_boundary: Option<&CursorBoundary>,
1190    ) -> Result<(), InternalError> {
1191        if !Self::pk_order_stream_fast_path_shape_supported(plan) {
1192            return Ok(());
1193        }
1194        let _ = decode_pk_cursor_boundary::<E>(cursor_boundary)?;
1195
1196        Ok(())
1197    }
1198}