Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7use crate::{
8    db::{
9        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
10        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11        access::AccessStrategy,
12        commit::CommitSchemaFingerprint,
13        cursor::{
14            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
15        },
16        diagnostics::ExecutionTrace,
17        executor::{ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan},
18        query::builder::{
19            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
20        },
21        query::explain::{
22            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
23        },
24        query::{
25            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
26            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
27        },
28    },
29    error::InternalError,
30    model::entity::EntityModel,
31    traits::{CanisterKind, EntityKind, EntityValue, Path},
32};
33#[cfg(feature = "perf-attribution")]
34use candid::CandidType;
35#[cfg(feature = "perf-attribution")]
36use serde::Deserialize;
37use std::collections::HashMap;
38
39#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
40pub(in crate::db) enum QueryPlanVisibility {
41    StoreNotReady,
42    StoreReady,
43}
44
45#[derive(Clone, Debug, Eq, Hash, PartialEq)]
46pub(in crate::db) struct QueryPlanCacheKey {
47    entity_path: &'static str,
48    schema_fingerprint: CommitSchemaFingerprint,
49    visibility: QueryPlanVisibility,
50    query_fingerprint: [u8; 32],
51}
52
53pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, AccessPlannedQuery>;
54
55///
56/// QueryExecutionAttribution
57///
58/// QueryExecutionAttribution records the top-level compile/execute split for
59/// typed/fluent query execution at the session boundary.
60///
61#[cfg(feature = "perf-attribution")]
62#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
63pub struct QueryExecutionAttribution {
64    pub compile_local_instructions: u64,
65    pub execute_local_instructions: u64,
66    pub total_local_instructions: u64,
67}
68
69#[cfg(feature = "perf-attribution")]
70#[expect(
71    clippy::missing_const_for_fn,
72    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
73)]
74fn read_query_local_instruction_counter() -> u64 {
75    #[cfg(target_arch = "wasm32")]
76    {
77        canic_cdk::api::performance_counter(1)
78    }
79
80    #[cfg(not(target_arch = "wasm32"))]
81    {
82        0
83    }
84}
85
86#[cfg(feature = "perf-attribution")]
87fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
88    let start = read_query_local_instruction_counter();
89    let result = run();
90    let delta = read_query_local_instruction_counter().saturating_sub(start);
91
92    (delta, result)
93}
94
95impl<C: CanisterKind> DbSession<C> {
96    fn query_plan_cache(&self) -> &std::cell::RefCell<QueryPlanCache> {
97        self.query_plan_cache
98            .get_or_init(|| std::cell::RefCell::new(QueryPlanCache::new()))
99    }
100
101    const fn visible_indexes_for_model(
102        model: &'static EntityModel,
103        visibility: QueryPlanVisibility,
104    ) -> VisibleIndexes<'static> {
105        match visibility {
106            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
107            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
108        }
109    }
110
111    #[cfg(test)]
112    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
113        self.query_plan_cache().borrow().len()
114    }
115
116    pub(in crate::db) fn query_plan_visibility_for_store_path(
117        &self,
118        store_path: &'static str,
119    ) -> Result<QueryPlanVisibility, QueryError> {
120        let store = self
121            .db
122            .recovered_store(store_path)
123            .map_err(QueryError::execute)?;
124        let visibility = if store.index_state() == crate::db::IndexState::Ready {
125            QueryPlanVisibility::StoreReady
126        } else {
127            QueryPlanVisibility::StoreNotReady
128        };
129
130        Ok(visibility)
131    }
132
133    pub(in crate::db) fn cached_structural_plan_for_authority(
134        &self,
135        entity_path: &'static str,
136        schema_fingerprint: CommitSchemaFingerprint,
137        store_path: &'static str,
138        model: &'static EntityModel,
139        query: &StructuralQuery,
140    ) -> Result<AccessPlannedQuery, QueryError> {
141        let visibility = self.query_plan_visibility_for_store_path(store_path)?;
142        let cache_key = QueryPlanCacheKey {
143            entity_path,
144            schema_fingerprint,
145            visibility,
146            query_fingerprint: query.cache_fingerprint(),
147        };
148
149        {
150            let cache = self.query_plan_cache().borrow();
151            if let Some(plan) = cache.get(&cache_key) {
152                return Ok(plan.clone());
153            }
154        }
155
156        let visible_indexes = Self::visible_indexes_for_model(model, visibility);
157        let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
158        self.query_plan_cache()
159            .borrow_mut()
160            .insert(cache_key, plan.clone());
161
162        Ok(plan)
163    }
164
165    // Resolve the planner-visible index slice for one typed query exactly once
166    // at the session boundary before handing execution/planning off to query-owned logic.
167    fn with_query_visible_indexes<E, T>(
168        &self,
169        query: &Query<E>,
170        op: impl FnOnce(
171            &Query<E>,
172            &crate::db::query::plan::VisibleIndexes<'static>,
173        ) -> Result<T, QueryError>,
174    ) -> Result<T, QueryError>
175    where
176        E: EntityKind<Canister = C>,
177    {
178        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
179        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
180
181        op(query, &visible_indexes)
182    }
183
184    // Resolve one typed structural query onto the shared lower plan cache so
185    // typed/fluent callers do not each duplicate the entity metadata plumbing.
186    fn cached_structural_plan_for_entity<E>(
187        &self,
188        query: &StructuralQuery,
189    ) -> Result<AccessPlannedQuery, QueryError>
190    where
191        E: EntityKind<Canister = C>,
192    {
193        self.cached_structural_plan_for_authority(
194            E::PATH,
195            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
196            E::Store::PATH,
197            E::MODEL,
198            query,
199        )
200    }
201
202    // Compile one typed query using only the indexes currently visible for the
203    // query's recovered store.
204    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
205        &self,
206        query: &Query<E>,
207    ) -> Result<CompiledQuery<E>, QueryError>
208    where
209        E: EntityKind<Canister = C>,
210    {
211        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
212
213        Ok(Query::<E>::compiled_query_from_plan(plan))
214    }
215
216    // Build one logical planned-query shell using only the indexes currently
217    // visible for the query's recovered store.
218    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
219        &self,
220        query: &Query<E>,
221    ) -> Result<PlannedQuery<E>, QueryError>
222    where
223        E: EntityKind<Canister = C>,
224    {
225        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
226
227        Ok(Query::<E>::planned_query_from_plan(plan))
228    }
229
230    // Project one logical explain payload using only planner-visible indexes.
231    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
232        &self,
233        query: &Query<E>,
234    ) -> Result<ExplainPlan, QueryError>
235    where
236        E: EntityKind<Canister = C>,
237    {
238        self.with_query_visible_indexes(query, |query, visible_indexes| {
239            query.explain_with_visible_indexes(visible_indexes)
240        })
241    }
242
243    // Hash one typed query plan using only the indexes currently visible for
244    // the query's recovered store.
245    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
246        &self,
247        query: &Query<E>,
248    ) -> Result<String, QueryError>
249    where
250        E: EntityKind<Canister = C>,
251    {
252        self.with_query_visible_indexes(query, |query, visible_indexes| {
253            query.plan_hash_hex_with_visible_indexes(visible_indexes)
254        })
255    }
256
257    // Explain one load execution shape using only planner-visible
258    // indexes from the recovered store state.
259    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
260        &self,
261        query: &Query<E>,
262    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
263    where
264        E: EntityValue + EntityKind<Canister = C>,
265    {
266        self.with_query_visible_indexes(query, |query, visible_indexes| {
267            query.explain_execution_with_visible_indexes(visible_indexes)
268        })
269    }
270
271    // Render one load execution descriptor as deterministic text using
272    // only planner-visible indexes from the recovered store state.
273    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
274        &self,
275        query: &Query<E>,
276    ) -> Result<String, QueryError>
277    where
278        E: EntityValue + EntityKind<Canister = C>,
279    {
280        self.with_query_visible_indexes(query, |query, visible_indexes| {
281            query.explain_execution_text_with_visible_indexes(visible_indexes)
282        })
283    }
284
285    // Render one load execution descriptor as canonical JSON using
286    // only planner-visible indexes from the recovered store state.
287    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
288        &self,
289        query: &Query<E>,
290    ) -> Result<String, QueryError>
291    where
292        E: EntityValue + EntityKind<Canister = C>,
293    {
294        self.with_query_visible_indexes(query, |query, visible_indexes| {
295            query.explain_execution_json_with_visible_indexes(visible_indexes)
296        })
297    }
298
299    // Render one load execution descriptor plus route diagnostics using
300    // only planner-visible indexes from the recovered store state.
301    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
302        &self,
303        query: &Query<E>,
304    ) -> Result<String, QueryError>
305    where
306        E: EntityValue + EntityKind<Canister = C>,
307    {
308        self.with_query_visible_indexes(query, |query, visible_indexes| {
309            query.explain_execution_verbose_with_visible_indexes(visible_indexes)
310        })
311    }
312
313    // Explain one prepared fluent aggregate terminal using only
314    // planner-visible indexes from the recovered store state.
315    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
316        &self,
317        query: &Query<E>,
318        strategy: &S,
319    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
320    where
321        E: EntityValue + EntityKind<Canister = C>,
322        S: PreparedFluentAggregateExplainStrategy,
323    {
324        self.with_query_visible_indexes(query, |query, visible_indexes| {
325            query
326                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
327        })
328    }
329
330    // Explain one `bytes_by(field)` terminal using only planner-visible
331    // indexes from the recovered store state.
332    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
333        &self,
334        query: &Query<E>,
335        target_field: &str,
336    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
337    where
338        E: EntityValue + EntityKind<Canister = C>,
339    {
340        self.with_query_visible_indexes(query, |query, visible_indexes| {
341            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
342        })
343    }
344
345    // Explain one prepared fluent projection terminal using only
346    // planner-visible indexes from the recovered store state.
347    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
348        &self,
349        query: &Query<E>,
350        strategy: &PreparedFluentProjectionStrategy,
351    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
352    where
353        E: EntityValue + EntityKind<Canister = C>,
354    {
355        self.with_query_visible_indexes(query, |query, visible_indexes| {
356            query.explain_prepared_projection_terminal_with_visible_indexes(
357                visible_indexes,
358                strategy,
359            )
360        })
361    }
362
363    // Validate that one execution strategy is admissible for scalar paged load
364    // execution and fail closed on grouped/primary-key-only routes.
365    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
366        match family {
367            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
368                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
369            )),
370            ExecutionFamily::Ordered => Ok(()),
371            ExecutionFamily::Grouped => Err(QueryError::invariant(
372                "grouped queries execute via execute(), not page().execute()",
373            )),
374        }
375    }
376
377    // Validate that one execution strategy is admissible for the grouped
378    // execution surface.
379    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
380        match family {
381            ExecutionFamily::Grouped => Ok(()),
382            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
383                "grouped execution requires grouped logical plans",
384            )),
385        }
386    }
387
388    /// Execute one scalar load/delete query and return materialized response rows.
389    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
390    where
391        E: PersistedRow<Canister = C> + EntityValue,
392    {
393        // Phase 1: compile typed intent into one prepared execution-plan contract.
394        let mode = query.mode();
395        let plan = self
396            .compile_query_with_visible_indexes(query)?
397            .into_prepared_execution_plan();
398
399        // Phase 2: delegate execution to the shared compiled-plan entry path.
400        self.execute_query_dyn(mode, plan)
401    }
402
403    /// Execute one typed query while reporting the compile/execute split at
404    /// the shared fluent query seam.
405    #[cfg(feature = "perf-attribution")]
406    #[doc(hidden)]
407    pub fn execute_query_result_with_attribution<E>(
408        &self,
409        query: &Query<E>,
410    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
411    where
412        E: PersistedRow<Canister = C> + EntityValue,
413    {
414        // Phase 1: measure compile work at the typed/fluent boundary.
415        let (compile_local_instructions, compiled) =
416            measure_query_stage(|| self.compile_query_with_visible_indexes(query));
417        let compiled = compiled?;
418        let plan = compiled.into_prepared_execution_plan();
419
420        // Phase 2: measure execute work separately using the already compiled plan.
421        let (execute_local_instructions, result) = measure_query_stage(|| {
422            if query.has_grouping() {
423                self.execute_grouped_plan_with_trace(plan, None)
424                    .map(|(page, trace)| {
425                        let next_cursor = page
426                            .next_cursor
427                            .map(|token| {
428                                let Some(token) = token.as_grouped() else {
429                                    return Err(
430                                        QueryError::grouped_paged_emitted_scalar_continuation(),
431                                    );
432                                };
433
434                                token.encode().map_err(|err| {
435                                    QueryError::serialize_internal(format!(
436                                        "failed to serialize grouped continuation cursor: {err}"
437                                    ))
438                                })
439                            })
440                            .transpose()?;
441
442                        Ok::<LoadQueryResult<E>, QueryError>(LoadQueryResult::Grouped(
443                            PagedGroupedExecutionWithTrace::new(page.rows, next_cursor, trace),
444                        ))
445                    })?
446            } else {
447                self.execute_query_dyn(query.mode(), plan)
448                    .map(LoadQueryResult::Rows)
449            }
450        });
451        let result = result?;
452        let total_local_instructions =
453            compile_local_instructions.saturating_add(execute_local_instructions);
454
455        Ok((
456            result,
457            QueryExecutionAttribution {
458                compile_local_instructions,
459                execute_local_instructions,
460                total_local_instructions,
461            },
462        ))
463    }
464
465    // Execute one typed query through the unified row/grouped result surface so
466    // higher layers do not need to branch on grouped shape themselves.
467    #[doc(hidden)]
468    pub fn execute_query_result<E>(
469        &self,
470        query: &Query<E>,
471    ) -> Result<LoadQueryResult<E>, QueryError>
472    where
473        E: PersistedRow<Canister = C> + EntityValue,
474    {
475        if query.has_grouping() {
476            return self
477                .execute_grouped(query, None)
478                .map(LoadQueryResult::Grouped);
479        }
480
481        self.execute_query(query).map(LoadQueryResult::Rows)
482    }
483
484    /// Execute one typed delete query and return only the affected-row count.
485    #[doc(hidden)]
486    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
487    where
488        E: PersistedRow<Canister = C> + EntityValue,
489    {
490        // Phase 1: fail closed if the caller routes a non-delete query here.
491        if !query.mode().is_delete() {
492            return Err(QueryError::unsupported_query(
493                "delete count execution requires delete query mode",
494            ));
495        }
496
497        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
498        let plan = self
499            .compile_query_with_visible_indexes(query)?
500            .into_prepared_execution_plan();
501
502        // Phase 3: execute the shared delete core while skipping response-row materialization.
503        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
504            .map_err(QueryError::execute)
505    }
506
507    /// Execute one scalar query from one pre-built prepared execution contract.
508    ///
509    /// This is the shared compiled-plan entry boundary used by the typed
510    /// `execute_query(...)` surface and adjacent query execution facades.
511    pub(in crate::db) fn execute_query_dyn<E>(
512        &self,
513        mode: QueryMode,
514        plan: PreparedExecutionPlan<E>,
515    ) -> Result<EntityResponse<E>, QueryError>
516    where
517        E: PersistedRow<Canister = C> + EntityValue,
518    {
519        let result = match mode {
520            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
521            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
522        };
523
524        result.map_err(QueryError::execute)
525    }
526
527    // Shared load-query terminal wrapper: build plan, run under metrics, map
528    // execution errors into query-facing errors.
529    pub(in crate::db) fn execute_load_query_with<E, T>(
530        &self,
531        query: &Query<E>,
532        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
533    ) -> Result<T, QueryError>
534    where
535        E: PersistedRow<Canister = C> + EntityValue,
536    {
537        let plan = self
538            .compile_query_with_visible_indexes(query)?
539            .into_prepared_execution_plan();
540
541        self.with_metrics(|| op(self.load_executor::<E>(), plan))
542            .map_err(QueryError::execute)
543    }
544
545    /// Build one trace payload for a query without executing it.
546    ///
547    /// This lightweight surface is intended for developer diagnostics:
548    /// plan hash, access strategy summary, and planner/executor route shape.
549    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
550    where
551        E: EntityKind<Canister = C>,
552    {
553        let compiled = self.compile_query_with_visible_indexes(query)?;
554        let explain = compiled.explain();
555        let plan_hash = compiled.plan_hash_hex();
556
557        let executable = compiled.into_prepared_execution_plan();
558        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
559        let execution_family = match query.mode() {
560            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
561            QueryMode::Delete(_) => None,
562        };
563
564        Ok(QueryTracePlan::new(
565            plan_hash,
566            access_strategy,
567            execution_family,
568            explain,
569        ))
570    }
571
572    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
573    pub(crate) fn execute_load_query_paged_with_trace<E>(
574        &self,
575        query: &Query<E>,
576        cursor_token: Option<&str>,
577    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
578    where
579        E: PersistedRow<Canister = C> + EntityValue,
580    {
581        // Phase 1: build/validate prepared execution plan and reject grouped plans.
582        let plan = self
583            .compile_query_with_visible_indexes(query)?
584            .into_prepared_execution_plan();
585        Self::ensure_scalar_paged_execution_family(
586            plan.execution_family().map_err(QueryError::execute)?,
587        )?;
588
589        // Phase 2: decode external cursor token and validate it against plan surface.
590        let cursor_bytes = decode_optional_cursor_token(cursor_token)
591            .map_err(QueryError::from_cursor_plan_error)?;
592        let cursor = plan
593            .prepare_cursor(cursor_bytes.as_deref())
594            .map_err(QueryError::from_executor_plan_error)?;
595
596        // Phase 3: execute one traced page and encode outbound continuation token.
597        let (page, trace) = self
598            .with_metrics(|| {
599                self.load_executor::<E>()
600                    .execute_paged_with_cursor_traced(plan, cursor)
601            })
602            .map_err(QueryError::execute)?;
603        let next_cursor = page
604            .next_cursor
605            .map(|token| {
606                let Some(token) = token.as_scalar() else {
607                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
608                };
609
610                token.encode().map_err(|err| {
611                    QueryError::serialize_internal(format!(
612                        "failed to serialize continuation cursor: {err}"
613                    ))
614                })
615            })
616            .transpose()?;
617
618        Ok(PagedLoadExecutionWithTrace::new(
619            page.items,
620            next_cursor,
621            trace,
622        ))
623    }
624
625    /// Execute one grouped query page with optional grouped continuation cursor.
626    ///
627    /// This is the explicit grouped execution boundary; scalar load APIs reject
628    /// grouped plans to preserve scalar response contracts.
629    pub(in crate::db) fn execute_grouped<E>(
630        &self,
631        query: &Query<E>,
632        cursor_token: Option<&str>,
633    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
634    where
635        E: PersistedRow<Canister = C> + EntityValue,
636    {
637        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
638        let next_cursor = page
639            .next_cursor
640            .map(|token| {
641                let Some(token) = token.as_grouped() else {
642                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
643                };
644
645                token.encode().map_err(|err| {
646                    QueryError::serialize_internal(format!(
647                        "failed to serialize grouped continuation cursor: {err}"
648                    ))
649                })
650            })
651            .transpose()?;
652
653        Ok(PagedGroupedExecutionWithTrace::new(
654            page.rows,
655            next_cursor,
656            trace,
657        ))
658    }
659
660    // Execute the canonical grouped query core and return the raw grouped page
661    // plus optional execution trace before outward cursor formatting.
662    fn execute_grouped_page_with_trace<E>(
663        &self,
664        query: &Query<E>,
665        cursor_token: Option<&str>,
666    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
667    where
668        E: PersistedRow<Canister = C> + EntityValue,
669    {
670        // Phase 1: build the prepared execution plan once from the typed query.
671        let plan = self
672            .compile_query_with_visible_indexes(query)?
673            .into_prepared_execution_plan();
674
675        // Phase 2: reuse the shared prepared grouped execution path.
676        self.execute_grouped_plan_with_trace(plan, cursor_token)
677    }
678
679    // Execute one grouped prepared plan page with optional grouped cursor.
680    fn execute_grouped_plan_with_trace<E>(
681        &self,
682        plan: PreparedExecutionPlan<E>,
683        cursor_token: Option<&str>,
684    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
685    where
686        E: PersistedRow<Canister = C> + EntityValue,
687    {
688        // Phase 1: validate the prepared plan shape before decoding cursors.
689        Self::ensure_grouped_execution_family(
690            plan.execution_family().map_err(QueryError::execute)?,
691        )?;
692
693        // Phase 2: decode external grouped cursor token and validate against plan.
694        let cursor = decode_optional_grouped_cursor_token(cursor_token)
695            .map_err(QueryError::from_cursor_plan_error)?;
696        let cursor = plan
697            .prepare_grouped_cursor_token(cursor)
698            .map_err(QueryError::from_executor_plan_error)?;
699
700        // Phase 3: execute one grouped page while preserving the structural
701        // grouped cursor payload for whichever outward cursor format the caller needs.
702        self.with_metrics(|| {
703            self.load_executor::<E>()
704                .execute_grouped_paged_with_cursor_traced(plan, cursor)
705        })
706        .map_err(QueryError::execute)
707    }
708}