Skip to main content

icydb_core/db/session/
query.rs

1//! Module: db::session::query
2//! Responsibility: session-bound query planning, explain, and cursor execution
3//! helpers that recover store visibility before delegating to query-owned logic.
4//! Does not own: query intent construction or executor runtime semantics.
5//! Boundary: resolves session visibility and cursor policy before handing work to the planner/executor.
6
7use crate::{
8    db::{
9        DbSession, EntityResponse, LoadQueryResult, PagedGroupedExecutionWithTrace,
10        PagedLoadExecutionWithTrace, PersistedRow, Query, QueryError, QueryTracePlan,
11        access::AccessStrategy,
12        commit::CommitSchemaFingerprint,
13        cursor::{
14            CursorPlanError, decode_optional_cursor_token, decode_optional_grouped_cursor_token,
15        },
16        diagnostics::ExecutionTrace,
17        executor::{
18            ExecutionFamily, GroupedCursorPage, LoadExecutor, PreparedExecutionPlan,
19            SharedPreparedExecutionPlan,
20        },
21        query::builder::{
22            PreparedFluentAggregateExplainStrategy, PreparedFluentProjectionStrategy,
23        },
24        query::explain::{
25            ExplainAggregateTerminalPlan, ExplainExecutionNodeDescriptor, ExplainPlan,
26        },
27        query::{
28            intent::{CompiledQuery, PlannedQuery, StructuralQuery},
29            plan::{AccessPlannedQuery, QueryMode, VisibleIndexes},
30        },
31    },
32    error::InternalError,
33    model::entity::EntityModel,
34    traits::{CanisterKind, EntityKind, EntityValue, Path},
35};
36#[cfg(feature = "perf-attribution")]
37use candid::CandidType;
38#[cfg(feature = "perf-attribution")]
39use serde::Deserialize;
40use std::{cell::RefCell, collections::HashMap};
41
42#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
43pub(in crate::db) enum QueryPlanVisibility {
44    StoreNotReady,
45    StoreReady,
46}
47
48#[derive(Clone, Debug, Eq, Hash, PartialEq)]
49pub(in crate::db) struct QueryPlanCacheKey {
50    entity_path: &'static str,
51    schema_fingerprint: CommitSchemaFingerprint,
52    visibility: QueryPlanVisibility,
53    query_fingerprint: [u8; 32],
54}
55
56#[derive(Clone, Debug)]
57pub(in crate::db) struct QueryPlanCacheEntry {
58    logical_plan: AccessPlannedQuery,
59    prepared_plan: SharedPreparedExecutionPlan,
60}
61
62impl QueryPlanCacheEntry {
63    #[must_use]
64    pub(in crate::db) const fn new(
65        logical_plan: AccessPlannedQuery,
66        prepared_plan: SharedPreparedExecutionPlan,
67    ) -> Self {
68        Self {
69            logical_plan,
70            prepared_plan,
71        }
72    }
73
74    #[must_use]
75    pub(in crate::db) const fn logical_plan(&self) -> &AccessPlannedQuery {
76        &self.logical_plan
77    }
78
79    #[must_use]
80    pub(in crate::db) fn typed_prepared_plan<E: EntityKind>(&self) -> PreparedExecutionPlan<E> {
81        self.prepared_plan.typed_clone::<E>()
82    }
83}
84
85pub(in crate::db) type QueryPlanCache = HashMap<QueryPlanCacheKey, QueryPlanCacheEntry>;
86
87thread_local! {
88    // Keep one in-heap query-plan cache per store registry so fresh `DbSession`
89    // facades can share prepared logical plans across update/query calls while
90    // tests and multi-registry host processes remain isolated by registry
91    // identity.
92    static QUERY_PLAN_CACHES: RefCell<HashMap<usize, QueryPlanCache>> =
93        RefCell::new(HashMap::new());
94}
95
96#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub(in crate::db) struct QueryPlanCacheAttribution {
98    pub hits: u64,
99    pub misses: u64,
100}
101
102impl QueryPlanCacheAttribution {
103    #[must_use]
104    const fn hit() -> Self {
105        Self { hits: 1, misses: 0 }
106    }
107
108    #[must_use]
109    const fn miss() -> Self {
110        Self { hits: 0, misses: 1 }
111    }
112}
113
114///
115/// QueryExecutionAttribution
116///
117/// QueryExecutionAttribution records the top-level compile/execute split for
118/// typed/fluent query execution at the session boundary.
119///
120#[cfg(feature = "perf-attribution")]
121#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
122pub struct QueryExecutionAttribution {
123    pub compile_local_instructions: u64,
124    pub execute_local_instructions: u64,
125    pub total_local_instructions: u64,
126    pub shared_query_plan_cache_hits: u64,
127    pub shared_query_plan_cache_misses: u64,
128}
129
130#[cfg(feature = "perf-attribution")]
131#[expect(
132    clippy::missing_const_for_fn,
133    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
134)]
135fn read_query_local_instruction_counter() -> u64 {
136    #[cfg(target_arch = "wasm32")]
137    {
138        canic_cdk::api::performance_counter(1)
139    }
140
141    #[cfg(not(target_arch = "wasm32"))]
142    {
143        0
144    }
145}
146
147#[cfg(feature = "perf-attribution")]
148fn measure_query_stage<T, E>(run: impl FnOnce() -> Result<T, E>) -> (u64, Result<T, E>) {
149    let start = read_query_local_instruction_counter();
150    let result = run();
151    let delta = read_query_local_instruction_counter().saturating_sub(start);
152
153    (delta, result)
154}
155
156impl<C: CanisterKind> DbSession<C> {
157    fn query_plan_cache_scope_id(&self) -> usize {
158        self.db.cache_scope_id()
159    }
160
161    fn with_query_plan_cache<R>(&self, f: impl FnOnce(&mut QueryPlanCache) -> R) -> R {
162        let scope_id = self.query_plan_cache_scope_id();
163
164        QUERY_PLAN_CACHES.with(|caches| {
165            let mut caches = caches.borrow_mut();
166            let cache = caches.entry(scope_id).or_default();
167
168            f(cache)
169        })
170    }
171
172    const fn visible_indexes_for_model(
173        model: &'static EntityModel,
174        visibility: QueryPlanVisibility,
175    ) -> VisibleIndexes<'static> {
176        match visibility {
177            QueryPlanVisibility::StoreReady => VisibleIndexes::planner_visible(model.indexes()),
178            QueryPlanVisibility::StoreNotReady => VisibleIndexes::none(),
179        }
180    }
181
182    #[cfg(test)]
183    pub(in crate::db) fn query_plan_cache_len(&self) -> usize {
184        self.with_query_plan_cache(|cache| cache.len())
185    }
186
187    #[cfg(test)]
188    pub(in crate::db) fn clear_query_plan_cache_for_tests(&self) {
189        self.with_query_plan_cache(QueryPlanCache::clear);
190    }
191
192    pub(in crate::db) fn query_plan_visibility_for_store_path(
193        &self,
194        store_path: &'static str,
195    ) -> Result<QueryPlanVisibility, QueryError> {
196        let store = self
197            .db
198            .recovered_store(store_path)
199            .map_err(QueryError::execute)?;
200        let visibility = if store.index_state() == crate::db::IndexState::Ready {
201            QueryPlanVisibility::StoreReady
202        } else {
203            QueryPlanVisibility::StoreNotReady
204        };
205
206        Ok(visibility)
207    }
208
209    pub(in crate::db) fn cached_query_plan_entry_for_authority(
210        &self,
211        authority: crate::db::executor::EntityAuthority,
212        schema_fingerprint: CommitSchemaFingerprint,
213        query: &StructuralQuery,
214    ) -> Result<(QueryPlanCacheEntry, QueryPlanCacheAttribution), QueryError> {
215        let visibility = self.query_plan_visibility_for_store_path(authority.store_path())?;
216        let cache_key = QueryPlanCacheKey {
217            entity_path: authority.entity_path(),
218            schema_fingerprint,
219            visibility,
220            query_fingerprint: query.cache_fingerprint(),
221        };
222
223        {
224            let cached = self.with_query_plan_cache(|cache| cache.get(&cache_key).cloned());
225            if let Some(entry) = cached {
226                return Ok((entry, QueryPlanCacheAttribution::hit()));
227            }
228        }
229
230        let visible_indexes = Self::visible_indexes_for_model(authority.model(), visibility);
231        let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
232        let entry = QueryPlanCacheEntry::new(
233            plan.clone(),
234            SharedPreparedExecutionPlan::from_plan(authority, plan),
235        );
236        self.with_query_plan_cache(|cache| {
237            cache.insert(cache_key, entry.clone());
238        });
239
240        Ok((entry, QueryPlanCacheAttribution::miss()))
241    }
242
243    pub(in crate::db) fn cached_structural_plan_for_authority(
244        &self,
245        authority: crate::db::executor::EntityAuthority,
246        schema_fingerprint: CommitSchemaFingerprint,
247        query: &StructuralQuery,
248    ) -> Result<AccessPlannedQuery, QueryError> {
249        let (entry, _) =
250            self.cached_query_plan_entry_for_authority(authority, schema_fingerprint, query)?;
251
252        Ok(entry.logical_plan().clone())
253    }
254
255    // Resolve the planner-visible index slice for one typed query exactly once
256    // at the session boundary before handing execution/planning off to query-owned logic.
257    fn with_query_visible_indexes<E, T>(
258        &self,
259        query: &Query<E>,
260        op: impl FnOnce(
261            &Query<E>,
262            &crate::db::query::plan::VisibleIndexes<'static>,
263        ) -> Result<T, QueryError>,
264    ) -> Result<T, QueryError>
265    where
266        E: EntityKind<Canister = C>,
267    {
268        let visibility = self.query_plan_visibility_for_store_path(E::Store::PATH)?;
269        let visible_indexes = Self::visible_indexes_for_model(E::MODEL, visibility);
270
271        op(query, &visible_indexes)
272    }
273
274    // Resolve one typed structural query onto the shared lower plan cache so
275    // typed/fluent callers do not each duplicate the entity metadata plumbing.
276    fn cached_structural_plan_for_entity<E>(
277        &self,
278        query: &StructuralQuery,
279    ) -> Result<AccessPlannedQuery, QueryError>
280    where
281        E: EntityKind<Canister = C>,
282    {
283        self.cached_structural_plan_for_authority(
284            crate::db::executor::EntityAuthority::for_type::<E>(),
285            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
286            query,
287        )
288    }
289
290    fn cached_prepared_query_plan_for_entity<E>(
291        &self,
292        query: &StructuralQuery,
293    ) -> Result<(PreparedExecutionPlan<E>, QueryPlanCacheAttribution), QueryError>
294    where
295        E: EntityKind<Canister = C>,
296    {
297        let (entry, attribution) = self.cached_query_plan_entry_for_authority(
298            crate::db::executor::EntityAuthority::for_type::<E>(),
299            crate::db::schema::commit_schema_fingerprint_for_entity::<E>(),
300            query,
301        )?;
302
303        Ok((entry.typed_prepared_plan::<E>(), attribution))
304    }
305
306    // Compile one typed query using only the indexes currently visible for the
307    // query's recovered store.
308    pub(in crate::db) fn compile_query_with_visible_indexes<E>(
309        &self,
310        query: &Query<E>,
311    ) -> Result<CompiledQuery<E>, QueryError>
312    where
313        E: EntityKind<Canister = C>,
314    {
315        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
316
317        Ok(Query::<E>::compiled_query_from_plan(plan))
318    }
319
320    // Build one logical planned-query shell using only the indexes currently
321    // visible for the query's recovered store.
322    pub(in crate::db) fn planned_query_with_visible_indexes<E>(
323        &self,
324        query: &Query<E>,
325    ) -> Result<PlannedQuery<E>, QueryError>
326    where
327        E: EntityKind<Canister = C>,
328    {
329        let plan = self.cached_structural_plan_for_entity::<E>(query.structural())?;
330
331        Ok(Query::<E>::planned_query_from_plan(plan))
332    }
333
334    // Project one logical explain payload using only planner-visible indexes.
335    pub(in crate::db) fn explain_query_with_visible_indexes<E>(
336        &self,
337        query: &Query<E>,
338    ) -> Result<ExplainPlan, QueryError>
339    where
340        E: EntityKind<Canister = C>,
341    {
342        self.with_query_visible_indexes(query, |query, visible_indexes| {
343            query.explain_with_visible_indexes(visible_indexes)
344        })
345    }
346
347    // Hash one typed query plan using only the indexes currently visible for
348    // the query's recovered store.
349    pub(in crate::db) fn query_plan_hash_hex_with_visible_indexes<E>(
350        &self,
351        query: &Query<E>,
352    ) -> Result<String, QueryError>
353    where
354        E: EntityKind<Canister = C>,
355    {
356        self.with_query_visible_indexes(query, |query, visible_indexes| {
357            query.plan_hash_hex_with_visible_indexes(visible_indexes)
358        })
359    }
360
361    // Explain one load execution shape using only planner-visible
362    // indexes from the recovered store state.
363    pub(in crate::db) fn explain_query_execution_with_visible_indexes<E>(
364        &self,
365        query: &Query<E>,
366    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
367    where
368        E: EntityValue + EntityKind<Canister = C>,
369    {
370        self.with_query_visible_indexes(query, |query, visible_indexes| {
371            query.explain_execution_with_visible_indexes(visible_indexes)
372        })
373    }
374
375    // Render one load execution descriptor as deterministic text using
376    // only planner-visible indexes from the recovered store state.
377    pub(in crate::db) fn explain_query_execution_text_with_visible_indexes<E>(
378        &self,
379        query: &Query<E>,
380    ) -> Result<String, QueryError>
381    where
382        E: EntityValue + EntityKind<Canister = C>,
383    {
384        self.with_query_visible_indexes(query, |query, visible_indexes| {
385            query.explain_execution_text_with_visible_indexes(visible_indexes)
386        })
387    }
388
389    // Render one load execution descriptor as canonical JSON using
390    // only planner-visible indexes from the recovered store state.
391    pub(in crate::db) fn explain_query_execution_json_with_visible_indexes<E>(
392        &self,
393        query: &Query<E>,
394    ) -> Result<String, QueryError>
395    where
396        E: EntityValue + EntityKind<Canister = C>,
397    {
398        self.with_query_visible_indexes(query, |query, visible_indexes| {
399            query.explain_execution_json_with_visible_indexes(visible_indexes)
400        })
401    }
402
403    // Render one load execution descriptor plus route diagnostics using
404    // only planner-visible indexes from the recovered store state.
405    pub(in crate::db) fn explain_query_execution_verbose_with_visible_indexes<E>(
406        &self,
407        query: &Query<E>,
408    ) -> Result<String, QueryError>
409    where
410        E: EntityValue + EntityKind<Canister = C>,
411    {
412        self.with_query_visible_indexes(query, |query, visible_indexes| {
413            query.explain_execution_verbose_with_visible_indexes(visible_indexes)
414        })
415    }
416
417    // Explain one prepared fluent aggregate terminal using only
418    // planner-visible indexes from the recovered store state.
419    pub(in crate::db) fn explain_query_prepared_aggregate_terminal_with_visible_indexes<E, S>(
420        &self,
421        query: &Query<E>,
422        strategy: &S,
423    ) -> Result<ExplainAggregateTerminalPlan, QueryError>
424    where
425        E: EntityValue + EntityKind<Canister = C>,
426        S: PreparedFluentAggregateExplainStrategy,
427    {
428        self.with_query_visible_indexes(query, |query, visible_indexes| {
429            query
430                .explain_prepared_aggregate_terminal_with_visible_indexes(visible_indexes, strategy)
431        })
432    }
433
434    // Explain one `bytes_by(field)` terminal using only planner-visible
435    // indexes from the recovered store state.
436    pub(in crate::db) fn explain_query_bytes_by_with_visible_indexes<E>(
437        &self,
438        query: &Query<E>,
439        target_field: &str,
440    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
441    where
442        E: EntityValue + EntityKind<Canister = C>,
443    {
444        self.with_query_visible_indexes(query, |query, visible_indexes| {
445            query.explain_bytes_by_with_visible_indexes(visible_indexes, target_field)
446        })
447    }
448
449    // Explain one prepared fluent projection terminal using only
450    // planner-visible indexes from the recovered store state.
451    pub(in crate::db) fn explain_query_prepared_projection_terminal_with_visible_indexes<E>(
452        &self,
453        query: &Query<E>,
454        strategy: &PreparedFluentProjectionStrategy,
455    ) -> Result<ExplainExecutionNodeDescriptor, QueryError>
456    where
457        E: EntityValue + EntityKind<Canister = C>,
458    {
459        self.with_query_visible_indexes(query, |query, visible_indexes| {
460            query.explain_prepared_projection_terminal_with_visible_indexes(
461                visible_indexes,
462                strategy,
463            )
464        })
465    }
466
467    // Validate that one execution strategy is admissible for scalar paged load
468    // execution and fail closed on grouped/primary-key-only routes.
469    fn ensure_scalar_paged_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
470        match family {
471            ExecutionFamily::PrimaryKey => Err(QueryError::invariant(
472                CursorPlanError::cursor_requires_explicit_or_grouped_ordering_message(),
473            )),
474            ExecutionFamily::Ordered => Ok(()),
475            ExecutionFamily::Grouped => Err(QueryError::invariant(
476                "grouped queries execute via execute(), not page().execute()",
477            )),
478        }
479    }
480
481    // Validate that one execution strategy is admissible for the grouped
482    // execution surface.
483    fn ensure_grouped_execution_family(family: ExecutionFamily) -> Result<(), QueryError> {
484        match family {
485            ExecutionFamily::Grouped => Ok(()),
486            ExecutionFamily::PrimaryKey | ExecutionFamily::Ordered => Err(QueryError::invariant(
487                "grouped execution requires grouped logical plans",
488            )),
489        }
490    }
491
492    /// Execute one scalar load/delete query and return materialized response rows.
493    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<EntityResponse<E>, QueryError>
494    where
495        E: PersistedRow<Canister = C> + EntityValue,
496    {
497        // Phase 1: compile typed intent into one prepared execution-plan contract.
498        let mode = query.mode();
499        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
500
501        // Phase 2: delegate execution to the shared compiled-plan entry path.
502        self.execute_query_dyn(mode, plan)
503    }
504
505    /// Execute one typed query while reporting the compile/execute split at
506    /// the shared fluent query seam.
507    #[cfg(feature = "perf-attribution")]
508    #[doc(hidden)]
509    pub fn execute_query_result_with_attribution<E>(
510        &self,
511        query: &Query<E>,
512    ) -> Result<(LoadQueryResult<E>, QueryExecutionAttribution), QueryError>
513    where
514        E: PersistedRow<Canister = C> + EntityValue,
515    {
516        // Phase 1: measure compile work at the typed/fluent boundary.
517        let (compile_local_instructions, compiled) =
518            measure_query_stage(|| self.compile_query_with_visible_indexes(query));
519        let _compiled = compiled?;
520
521        // Phase 2: resolve the shared lower cache and execute one query result.
522        let (execute_local_instructions, result_and_cache) = measure_query_stage(|| {
523            let (plan, cache_attribution) =
524                self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
525
526            if query.has_grouping() {
527                self.execute_grouped_plan_with_trace(plan, None)
528                    .map(|(page, trace)| {
529                        let next_cursor = page
530                            .next_cursor
531                            .map(|token| {
532                                let Some(token) = token.as_grouped() else {
533                                    return Err(
534                                        QueryError::grouped_paged_emitted_scalar_continuation(),
535                                    );
536                                };
537
538                                token.encode().map_err(|err| {
539                                    QueryError::serialize_internal(format!(
540                                        "failed to serialize grouped continuation cursor: {err}"
541                                    ))
542                                })
543                            })
544                            .transpose()?;
545
546                        Ok::<(LoadQueryResult<E>, QueryPlanCacheAttribution), QueryError>((
547                            LoadQueryResult::Grouped(PagedGroupedExecutionWithTrace::new(
548                                page.rows,
549                                next_cursor,
550                                trace,
551                            )),
552                            cache_attribution,
553                        ))
554                    })?
555            } else {
556                self.execute_query_dyn(query.mode(), plan)
557                    .map(LoadQueryResult::Rows)
558                    .map(|result| (result, cache_attribution))
559            }
560        });
561        let (result, cache_attribution) = result_and_cache?;
562        let total_local_instructions =
563            compile_local_instructions.saturating_add(execute_local_instructions);
564
565        Ok((
566            result,
567            QueryExecutionAttribution {
568                compile_local_instructions,
569                execute_local_instructions,
570                total_local_instructions,
571                shared_query_plan_cache_hits: cache_attribution.hits,
572                shared_query_plan_cache_misses: cache_attribution.misses,
573            },
574        ))
575    }
576
577    // Execute one typed query through the unified row/grouped result surface so
578    // higher layers do not need to branch on grouped shape themselves.
579    #[doc(hidden)]
580    pub fn execute_query_result<E>(
581        &self,
582        query: &Query<E>,
583    ) -> Result<LoadQueryResult<E>, QueryError>
584    where
585        E: PersistedRow<Canister = C> + EntityValue,
586    {
587        if query.has_grouping() {
588            return self
589                .execute_grouped(query, None)
590                .map(LoadQueryResult::Grouped);
591        }
592
593        self.execute_query(query).map(LoadQueryResult::Rows)
594    }
595
596    /// Execute one typed delete query and return only the affected-row count.
597    #[doc(hidden)]
598    pub fn execute_delete_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
599    where
600        E: PersistedRow<Canister = C> + EntityValue,
601    {
602        // Phase 1: fail closed if the caller routes a non-delete query here.
603        if !query.mode().is_delete() {
604            return Err(QueryError::unsupported_query(
605                "delete count execution requires delete query mode",
606            ));
607        }
608
609        // Phase 2: compile typed delete intent into one prepared execution-plan contract.
610        let plan = self
611            .compile_query_with_visible_indexes(query)?
612            .into_prepared_execution_plan();
613
614        // Phase 3: execute the shared delete core while skipping response-row materialization.
615        self.with_metrics(|| self.delete_executor::<E>().execute_count(plan))
616            .map_err(QueryError::execute)
617    }
618
619    /// Execute one scalar query from one pre-built prepared execution contract.
620    ///
621    /// This is the shared compiled-plan entry boundary used by the typed
622    /// `execute_query(...)` surface and adjacent query execution facades.
623    pub(in crate::db) fn execute_query_dyn<E>(
624        &self,
625        mode: QueryMode,
626        plan: PreparedExecutionPlan<E>,
627    ) -> Result<EntityResponse<E>, QueryError>
628    where
629        E: PersistedRow<Canister = C> + EntityValue,
630    {
631        let result = match mode {
632            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
633            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
634        };
635
636        result.map_err(QueryError::execute)
637    }
638
639    // Shared load-query terminal wrapper: build plan, run under metrics, map
640    // execution errors into query-facing errors.
641    pub(in crate::db) fn execute_load_query_with<E, T>(
642        &self,
643        query: &Query<E>,
644        op: impl FnOnce(LoadExecutor<E>, PreparedExecutionPlan<E>) -> Result<T, InternalError>,
645    ) -> Result<T, QueryError>
646    where
647        E: PersistedRow<Canister = C> + EntityValue,
648    {
649        let (plan, _) = self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
650
651        self.with_metrics(|| op(self.load_executor::<E>(), plan))
652            .map_err(QueryError::execute)
653    }
654
655    /// Build one trace payload for a query without executing it.
656    ///
657    /// This lightweight surface is intended for developer diagnostics:
658    /// plan hash, access strategy summary, and planner/executor route shape.
659    pub fn trace_query<E>(&self, query: &Query<E>) -> Result<QueryTracePlan, QueryError>
660    where
661        E: EntityKind<Canister = C>,
662    {
663        let compiled = self.compile_query_with_visible_indexes(query)?;
664        let explain = compiled.explain();
665        let plan_hash = compiled.plan_hash_hex();
666
667        let (executable, _) =
668            self.cached_prepared_query_plan_for_entity::<E>(query.structural())?;
669        let access_strategy = AccessStrategy::from_plan(executable.access()).debug_summary();
670        let execution_family = match query.mode() {
671            QueryMode::Load(_) => Some(executable.execution_family().map_err(QueryError::execute)?),
672            QueryMode::Delete(_) => None,
673        };
674
675        Ok(QueryTracePlan::new(
676            plan_hash,
677            access_strategy,
678            execution_family,
679            explain,
680        ))
681    }
682
683    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
684    pub(crate) fn execute_load_query_paged_with_trace<E>(
685        &self,
686        query: &Query<E>,
687        cursor_token: Option<&str>,
688    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
689    where
690        E: PersistedRow<Canister = C> + EntityValue,
691    {
692        // Phase 1: build/validate prepared execution plan and reject grouped plans.
693        let plan = self
694            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
695            .0;
696        Self::ensure_scalar_paged_execution_family(
697            plan.execution_family().map_err(QueryError::execute)?,
698        )?;
699
700        // Phase 2: decode external cursor token and validate it against plan surface.
701        let cursor_bytes = decode_optional_cursor_token(cursor_token)
702            .map_err(QueryError::from_cursor_plan_error)?;
703        let cursor = plan
704            .prepare_cursor(cursor_bytes.as_deref())
705            .map_err(QueryError::from_executor_plan_error)?;
706
707        // Phase 3: execute one traced page and encode outbound continuation token.
708        let (page, trace) = self
709            .with_metrics(|| {
710                self.load_executor::<E>()
711                    .execute_paged_with_cursor_traced(plan, cursor)
712            })
713            .map_err(QueryError::execute)?;
714        let next_cursor = page
715            .next_cursor
716            .map(|token| {
717                let Some(token) = token.as_scalar() else {
718                    return Err(QueryError::scalar_paged_emitted_grouped_continuation());
719                };
720
721                token.encode().map_err(|err| {
722                    QueryError::serialize_internal(format!(
723                        "failed to serialize continuation cursor: {err}"
724                    ))
725                })
726            })
727            .transpose()?;
728
729        Ok(PagedLoadExecutionWithTrace::new(
730            page.items,
731            next_cursor,
732            trace,
733        ))
734    }
735
736    /// Execute one grouped query page with optional grouped continuation cursor.
737    ///
738    /// This is the explicit grouped execution boundary; scalar load APIs reject
739    /// grouped plans to preserve scalar response contracts.
740    pub(in crate::db) fn execute_grouped<E>(
741        &self,
742        query: &Query<E>,
743        cursor_token: Option<&str>,
744    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
745    where
746        E: PersistedRow<Canister = C> + EntityValue,
747    {
748        let (page, trace) = self.execute_grouped_page_with_trace(query, cursor_token)?;
749        let next_cursor = page
750            .next_cursor
751            .map(|token| {
752                let Some(token) = token.as_grouped() else {
753                    return Err(QueryError::grouped_paged_emitted_scalar_continuation());
754                };
755
756                token.encode().map_err(|err| {
757                    QueryError::serialize_internal(format!(
758                        "failed to serialize grouped continuation cursor: {err}"
759                    ))
760                })
761            })
762            .transpose()?;
763
764        Ok(PagedGroupedExecutionWithTrace::new(
765            page.rows,
766            next_cursor,
767            trace,
768        ))
769    }
770
771    // Execute the canonical grouped query core and return the raw grouped page
772    // plus optional execution trace before outward cursor formatting.
773    fn execute_grouped_page_with_trace<E>(
774        &self,
775        query: &Query<E>,
776        cursor_token: Option<&str>,
777    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
778    where
779        E: PersistedRow<Canister = C> + EntityValue,
780    {
781        // Phase 1: build the prepared execution plan once from the typed query.
782        let plan = self
783            .cached_prepared_query_plan_for_entity::<E>(query.structural())?
784            .0;
785
786        // Phase 2: reuse the shared prepared grouped execution path.
787        self.execute_grouped_plan_with_trace(plan, cursor_token)
788    }
789
790    // Execute one grouped prepared plan page with optional grouped cursor.
791    fn execute_grouped_plan_with_trace<E>(
792        &self,
793        plan: PreparedExecutionPlan<E>,
794        cursor_token: Option<&str>,
795    ) -> Result<(GroupedCursorPage, Option<ExecutionTrace>), QueryError>
796    where
797        E: PersistedRow<Canister = C> + EntityValue,
798    {
799        // Phase 1: validate the prepared plan shape before decoding cursors.
800        Self::ensure_grouped_execution_family(
801            plan.execution_family().map_err(QueryError::execute)?,
802        )?;
803
804        // Phase 2: decode external grouped cursor token and validate against plan.
805        let cursor = decode_optional_grouped_cursor_token(cursor_token)
806            .map_err(QueryError::from_cursor_plan_error)?;
807        let cursor = plan
808            .prepare_grouped_cursor_token(cursor)
809            .map_err(QueryError::from_executor_plan_error)?;
810
811        // Phase 3: execute one grouped page while preserving the structural
812        // grouped cursor payload for whichever outward cursor format the caller needs.
813        self.with_metrics(|| {
814            self.load_executor::<E>()
815                .execute_grouped_paged_with_cursor_traced(plan, cursor)
816        })
817        .map_err(QueryError::execute)
818    }
819}